diff options
author | Andrei Zeliankou <zelenkov@nginx.com> | 2023-06-14 18:20:09 +0100 |
---|---|---|
committer | Andrei Zeliankou <zelenkov@nginx.com> | 2023-06-14 18:20:09 +0100 |
commit | c183bd8749a19477390f8cb77efe5f6d223f0905 (patch) | |
tree | 4e821e9cb07be9a86bf2d442acb3ea6740ba5a99 /test/test_tls.py | |
parent | c6d05191a069ac150cc8eb2bece75cf79c0a465a (diff) | |
download | unit-c183bd8749a19477390f8cb77efe5f6d223f0905.tar.gz unit-c183bd8749a19477390f8cb77efe5f6d223f0905.tar.bz2 |
Tests: get rid of classes in test files.
Class usage came from the unittest framework and it was always redundant
after migration to the pytest. This commit removes classes from files
containing tests to make them more readable and understandable.
Diffstat (limited to '')
-rw-r--r-- | test/test_tls.py | 1017 |
1 files changed, 519 insertions, 498 deletions
diff --git a/test/test_tls.py b/test/test_tls.py index ca9d5b07..54fdb665 100644 --- a/test/test_tls.py +++ b/test/test_tls.py @@ -4,51 +4,58 @@ import subprocess import time import pytest -from unit.applications.tls import TestApplicationTLS +from unit.applications.tls import ApplicationTLS from unit.option import option prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}} - -class TestTLS(TestApplicationTLS): - def add_tls(self, application='empty', cert='default', port=7080): - assert 'success' in self.conf( - { - "pass": f"applications/{application}", - "tls": {"certificate": cert}, - }, - f'listeners/*:{port}', - ) - - def remove_tls(self, application='empty', port=7080): - assert 'success' in self.conf( - {"pass": f"applications/{application}"}, f'listeners/*:{port}' - ) - - def req(self, name='localhost', subject=None): - subj = subject if subject is not None else f'/CN={name}/' - - subprocess.check_output( - [ - 'openssl', - 'req', - '-new', - '-subj', - subj, - '-config', - f'{option.temp_dir}/openssl.conf', - '-out', - f'{option.temp_dir}/{name}.csr', - '-keyout', - f'{option.temp_dir}/{name}.key', - ], - stderr=subprocess.STDOUT, - ) - - def generate_ca_conf(self): - with open(f'{option.temp_dir}/ca.conf', 'w') as f: - f.write( - f"""[ ca ] +client = ApplicationTLS() + + +def add_tls(application='empty', cert='default', port=7080): + assert 'success' in client.conf( + { + "pass": f"applications/{application}", + "tls": {"certificate": cert}, + }, + f'listeners/*:{port}', + ) + + +def ca(cert='root', out='localhost'): + subprocess.check_output( + [ + 'openssl', + 'ca', + '-batch', + '-config', + f'{option.temp_dir}/ca.conf', + '-keyfile', + f'{option.temp_dir}/{cert}.key', + '-cert', + f'{option.temp_dir}/{cert}.crt', + '-in', + f'{option.temp_dir}/{out}.csr', + '-out', + f'{option.temp_dir}/{out}.crt', + ], + stderr=subprocess.STDOUT, + ) + + +def context_cert_req(cert='root'): + context = ssl.create_default_context() + context.check_hostname = False + context.verify_mode = ssl.CERT_REQUIRED + context.load_verify_locations(f'{option.temp_dir}/{cert}.crt') + + return context + + +def generate_ca_conf(): + with open(f'{option.temp_dir}/ca.conf', 'w') as f: + f.write( + f"""[ ca ] default_ca = myca [ myca ] @@ -66,618 +73,632 @@ commonName = optional [ myca_extensions ] basicConstraints = critical,CA:TRUE""" - ) - - with open(f'{option.temp_dir}/certserial', 'w') as f: - f.write('1000') - - with open(f'{option.temp_dir}/certindex', 'w') as f: - f.write('') - - with open(f'{option.temp_dir}/certindex.attr', 'w') as f: - f.write('') - - def ca(self, cert='root', out='localhost'): - subprocess.check_output( - [ - 'openssl', - 'ca', - '-batch', - '-config', - f'{option.temp_dir}/ca.conf', - '-keyfile', - f'{option.temp_dir}/{cert}.key', - '-cert', - f'{option.temp_dir}/{cert}.crt', - '-in', - f'{option.temp_dir}/{out}.csr', - '-out', - f'{option.temp_dir}/{out}.crt', - ], - stderr=subprocess.STDOUT, ) - def set_certificate_req_context(self, cert='root'): - self.context = ssl.create_default_context() - self.context.check_hostname = False - self.context.verify_mode = ssl.CERT_REQUIRED - self.context.load_verify_locations(f'{option.temp_dir}/{cert}.crt') + with open(f'{option.temp_dir}/certserial', 'w') as f: + f.write('1000') - def test_tls_listener_option_add(self): - self.load('empty') + with open(f'{option.temp_dir}/certindex', 'w') as f: + f.write('') - self.certificate() + with open(f'{option.temp_dir}/certindex.attr', 'w') as f: + f.write('') - self.add_tls() - assert self.get_ssl()['status'] == 200, 'add listener option' +def remove_tls(application='empty', port=7080): + assert 'success' in client.conf( + {"pass": f"applications/{application}"}, f'listeners/*:{port}' + ) - def test_tls_listener_option_remove(self): - self.load('empty') - self.certificate() +def req(name='localhost', subject=None): + subj = subject if subject is not None else f'/CN={name}/' - self.add_tls() + subprocess.check_output( + [ + 'openssl', + 'req', + '-new', + '-subj', + subj, + '-config', + f'{option.temp_dir}/openssl.conf', + '-out', + f'{option.temp_dir}/{name}.csr', + '-keyout', + f'{option.temp_dir}/{name}.key', + ], + stderr=subprocess.STDOUT, + ) - self.get_ssl() - self.remove_tls() +def test_tls_listener_option_add(): + client.load('empty') - assert self.get()['status'] == 200, 'remove listener option' + client.certificate() - def test_tls_certificate_remove(self): - self.load('empty') + add_tls() - self.certificate() + assert client.get_ssl()['status'] == 200, 'add listener option' - assert 'success' in self.conf_delete( - '/certificates/default' - ), 'remove certificate' - def test_tls_certificate_remove_used(self): - self.load('empty') +def test_tls_listener_option_remove(): + client.load('empty') - self.certificate() + client.certificate() - self.add_tls() + add_tls() - assert 'error' in self.conf_delete( - '/certificates/default' - ), 'remove certificate' + client.get_ssl() - def test_tls_certificate_remove_nonexisting(self): - self.load('empty') + remove_tls() - self.certificate() + assert client.get()['status'] == 200, 'remove listener option' - self.add_tls() - assert 'error' in self.conf_delete( - '/certificates/blah' - ), 'remove nonexistings certificate' +def test_tls_certificate_remove(): + client.load('empty') - @pytest.mark.skip('not yet') - def test_tls_certificate_update(self): - self.load('empty') + client.certificate() - self.certificate() + assert 'success' in client.conf_delete( + '/certificates/default' + ), 'remove certificate' - self.add_tls() - cert_old = ssl.get_server_certificate(('127.0.0.1', 7080)) +def test_tls_certificate_remove_used(): + client.load('empty') - self.certificate() + client.certificate() - assert cert_old != ssl.get_server_certificate( - ('127.0.0.1', 7080) - ), 'update certificate' + add_tls() - @pytest.mark.skip('not yet') - def test_tls_certificate_key_incorrect(self): - self.load('empty') + assert 'error' in client.conf_delete( + '/certificates/default' + ), 'remove certificate' - self.certificate('first', False) - self.certificate('second', False) - assert 'error' in self.certificate_load( - 'first', 'second' - ), 'key incorrect' +def test_tls_certificate_remove_nonexisting(): + client.load('empty') - def test_tls_certificate_change(self): - self.load('empty') + client.certificate() - self.certificate() - self.certificate('new') + add_tls() - self.add_tls() + assert 'error' in client.conf_delete( + '/certificates/blah' + ), 'remove nonexistings certificate' - cert_old = ssl.get_server_certificate(('127.0.0.1', 7080)) - self.add_tls(cert='new') +@pytest.mark.skip('not yet') +def test_tls_certificate_update(): + client.load('empty') - assert cert_old != ssl.get_server_certificate( - ('127.0.0.1', 7080) - ), 'change certificate' + client.certificate() - def test_tls_certificate_key_rsa(self): - self.load('empty') + add_tls() - self.certificate() + cert_old = ssl.get_server_certificate(('127.0.0.1', 7080)) - assert ( - self.conf_get('/certificates/default/key') == 'RSA (2048 bits)' - ), 'certificate key rsa' + client.certificate() - def test_tls_certificate_key_ec(self, temp_dir): - self.load('empty') + assert cert_old != ssl.get_server_certificate( + ('127.0.0.1', 7080) + ), 'update certificate' - self.openssl_conf() - subprocess.check_output( - [ - 'openssl', - 'ecparam', - '-noout', - '-genkey', - '-out', - f'{temp_dir}/ec.key', - '-name', - 'prime256v1', - ], - stderr=subprocess.STDOUT, - ) +@pytest.mark.skip('not yet') +def test_tls_certificate_key_incorrect(): + client.load('empty') - subprocess.check_output( - [ - 'openssl', - 'req', - '-x509', - '-new', - '-subj', - '/CN=ec/', - '-config', - f'{temp_dir}/openssl.conf', - '-key', - f'{temp_dir}/ec.key', - '-out', - f'{temp_dir}/ec.crt', - ], - stderr=subprocess.STDOUT, - ) + client.certificate('first', False) + client.certificate('second', False) - self.certificate_load('ec') + assert 'error' in client.certificate_load( + 'first', 'second' + ), 'key incorrect' - assert ( - self.conf_get('/certificates/ec/key') == 'ECDH' - ), 'certificate key ec' - def test_tls_certificate_chain_options(self, date_to_sec_epoch, sec_epoch): - self.load('empty') - date_format = '%b %d %X %Y %Z' +def test_tls_certificate_change(): + client.load('empty') - self.certificate() + client.certificate() + client.certificate('new') - chain = self.conf_get('/certificates/default/chain') + add_tls() - assert len(chain) == 1, 'certificate chain length' + cert_old = ssl.get_server_certificate(('127.0.0.1', 7080)) - cert = chain[0] + add_tls(cert='new') - assert ( - cert['subject']['common_name'] == 'default' - ), 'certificate subject common name' - assert ( - cert['issuer']['common_name'] == 'default' - ), 'certificate issuer common name' + assert cert_old != ssl.get_server_certificate( + ('127.0.0.1', 7080) + ), 'change certificate' - assert ( - abs( - sec_epoch - - date_to_sec_epoch(cert['validity']['since'], date_format) - ) - < 60 - ), 'certificate validity since' - assert ( - date_to_sec_epoch(cert['validity']['until'], date_format) - - date_to_sec_epoch(cert['validity']['since'], date_format) - == 2592000 - ), 'certificate validity until' - def test_tls_certificate_chain(self, temp_dir): - self.load('empty') +def test_tls_certificate_key_rsa(): + client.load('empty') + + client.certificate() + + assert ( + client.conf_get('/certificates/default/key') == 'RSA (2048 bits)' + ), 'certificate key rsa' + - self.certificate('root', False) +def test_tls_certificate_key_ec(temp_dir): + client.load('empty') - self.req('int') - self.req('end') + client.openssl_conf() - self.generate_ca_conf() + subprocess.check_output( + [ + 'openssl', + 'ecparam', + '-noout', + '-genkey', + '-out', + f'{temp_dir}/ec.key', + '-name', + 'prime256v1', + ], + stderr=subprocess.STDOUT, + ) - self.ca(cert='root', out='int') - self.ca(cert='int', out='end') + subprocess.check_output( + [ + 'openssl', + 'req', + '-x509', + '-new', + '-subj', + '/CN=ec/', + '-config', + f'{temp_dir}/openssl.conf', + '-key', + f'{temp_dir}/ec.key', + '-out', + f'{temp_dir}/ec.crt', + ], + stderr=subprocess.STDOUT, + ) - crt_path = f'{temp_dir}/end-int.crt' - end_path = f'{temp_dir}/end.crt' - int_path = f'{temp_dir}/int.crt' - - with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open( - int_path, 'rb' - ) as int: - crt.write(end.read() + int.read()) - - self.set_certificate_req_context() - - # incomplete chain - - assert 'success' in self.certificate_load( - 'end', 'end' - ), 'certificate chain end upload' + client.certificate_load('ec') - chain = self.conf_get('/certificates/end/chain') - assert len(chain) == 1, 'certificate chain end length' - assert ( - chain[0]['subject']['common_name'] == 'end' - ), 'certificate chain end subject common name' - assert ( - chain[0]['issuer']['common_name'] == 'int' - ), 'certificate chain end issuer common name' + assert ( + client.conf_get('/certificates/ec/key') == 'ECDH' + ), 'certificate key ec' - self.add_tls(cert='end') - try: - resp = self.get_ssl() - except ssl.SSLError: - resp = None +def test_tls_certificate_chain_options(date_to_sec_epoch, sec_epoch): + client.load('empty') + date_format = '%b %d %X %Y %Z' - assert resp is None, 'certificate chain incomplete chain' + client.certificate() - # intermediate + chain = client.conf_get('/certificates/default/chain') - assert 'success' in self.certificate_load( - 'int', 'int' - ), 'certificate chain int upload' + assert len(chain) == 1, 'certificate chain length' - chain = self.conf_get('/certificates/int/chain') - assert len(chain) == 1, 'certificate chain int length' - assert ( - chain[0]['subject']['common_name'] == 'int' - ), 'certificate chain int subject common name' - assert ( - chain[0]['issuer']['common_name'] == 'root' - ), 'certificate chain int issuer common name' + cert = chain[0] - self.add_tls(cert='int') + assert ( + cert['subject']['common_name'] == 'default' + ), 'certificate subject common name' + assert ( + cert['issuer']['common_name'] == 'default' + ), 'certificate issuer common name' - assert self.get_ssl()['status'] == 200, 'certificate chain intermediate' + assert ( + abs( + sec_epoch + - date_to_sec_epoch(cert['validity']['since'], date_format) + ) + < 60 + ), 'certificate validity since' + assert ( + date_to_sec_epoch(cert['validity']['until'], date_format) + - date_to_sec_epoch(cert['validity']['since'], date_format) + == 2592000 + ), 'certificate validity until' - # intermediate server - assert 'success' in self.certificate_load( - 'end-int', 'end' - ), 'certificate chain end-int upload' +def test_tls_certificate_chain(temp_dir): + client.load('empty') - chain = self.conf_get('/certificates/end-int/chain') - assert len(chain) == 2, 'certificate chain end-int length' - assert ( - chain[0]['subject']['common_name'] == 'end' - ), 'certificate chain end-int int subject common name' - assert ( - chain[0]['issuer']['common_name'] == 'int' - ), 'certificate chain end-int int issuer common name' - assert ( - chain[1]['subject']['common_name'] == 'int' - ), 'certificate chain end-int end subject common name' - assert ( - chain[1]['issuer']['common_name'] == 'root' - ), 'certificate chain end-int end issuer common name' + client.certificate('root', False) - self.add_tls(cert='end-int') + req('int') + req('end') - assert ( - self.get_ssl()['status'] == 200 - ), 'certificate chain intermediate server' + generate_ca_conf() - def test_tls_certificate_chain_long(self, temp_dir): - self.load('empty') + ca(cert='root', out='int') + ca(cert='int', out='end') - self.generate_ca_conf() + crt_path = f'{temp_dir}/end-int.crt' + end_path = f'{temp_dir}/end.crt' + int_path = f'{temp_dir}/int.crt' + + with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open( + int_path, 'rb' + ) as int: + crt.write(end.read() + int.read()) + + # incomplete chain + + assert 'success' in client.certificate_load( + 'end', 'end' + ), 'certificate chain end upload' + + chain = client.conf_get('/certificates/end/chain') + assert len(chain) == 1, 'certificate chain end length' + assert ( + chain[0]['subject']['common_name'] == 'end' + ), 'certificate chain end subject common name' + assert ( + chain[0]['issuer']['common_name'] == 'int' + ), 'certificate chain end issuer common name' + + add_tls(cert='end') + + ctx_cert_req = context_cert_req() + try: + resp = client.get_ssl(context=ctx_cert_req) + except ssl.SSLError: + resp = None + + assert resp is None, 'certificate chain incomplete chain' + + # intermediate + + assert 'success' in client.certificate_load( + 'int', 'int' + ), 'certificate chain int upload' + + chain = client.conf_get('/certificates/int/chain') + assert len(chain) == 1, 'certificate chain int length' + assert ( + chain[0]['subject']['common_name'] == 'int' + ), 'certificate chain int subject common name' + assert ( + chain[0]['issuer']['common_name'] == 'root' + ), 'certificate chain int issuer common name' + + add_tls(cert='int') + + assert client.get_ssl()['status'] == 200, 'certificate chain intermediate' + + # intermediate server + + assert 'success' in client.certificate_load( + 'end-int', 'end' + ), 'certificate chain end-int upload' + + chain = client.conf_get('/certificates/end-int/chain') + assert len(chain) == 2, 'certificate chain end-int length' + assert ( + chain[0]['subject']['common_name'] == 'end' + ), 'certificate chain end-int int subject common name' + assert ( + chain[0]['issuer']['common_name'] == 'int' + ), 'certificate chain end-int int issuer common name' + assert ( + chain[1]['subject']['common_name'] == 'int' + ), 'certificate chain end-int end subject common name' + assert ( + chain[1]['issuer']['common_name'] == 'root' + ), 'certificate chain end-int end issuer common name' + + add_tls(cert='end-int') + + assert ( + client.get_ssl(context=ctx_cert_req)['status'] == 200 + ), 'certificate chain intermediate server' + + +def test_tls_certificate_chain_long(temp_dir): + client.load('empty') - # Minimum chain length is 3. - chain_length = 10 + generate_ca_conf() - for i in range(chain_length): - if i == 0: - self.certificate('root', False) - elif i == chain_length - 1: - self.req('end') - else: - self.req(f'int{i}') + # Minimum chain length is 3. + chain_length = 10 + + for i in range(chain_length): + if i == 0: + client.certificate('root', False) + elif i == chain_length - 1: + req('end') + else: + req(f'int{i}') + + for i in range(chain_length - 1): + if i == 0: + ca(cert='root', out='int1') + elif i == chain_length - 2: + ca(cert=f'int{(chain_length - 2)}', out='end') + else: + ca(cert=f'int{i}', out=f'int{(i + 1)}') + + for i in range(chain_length - 1, 0, -1): + path = ( + f'{temp_dir}/end.crt' + if i == chain_length - 1 + else f'{temp_dir}/int{i}.crt' + ) - for i in range(chain_length - 1): - if i == 0: - self.ca(cert='root', out='int1') - elif i == chain_length - 2: - self.ca(cert=f'int{(chain_length - 2)}', out='end') - else: - self.ca(cert=f'int{i}', out=f'int{(i + 1)}') + with open(f'{temp_dir}/all.crt', 'a') as chain, open(path) as cert: + chain.write(cert.read()) - for i in range(chain_length - 1, 0, -1): - path = ( - f'{temp_dir}/end.crt' - if i == chain_length - 1 - else f'{temp_dir}/int{i}.crt' - ) + assert 'success' in client.certificate_load( + 'all', 'end' + ), 'certificate chain upload' - with open(f'{temp_dir}/all.crt', 'a') as chain, open(path) as cert: - chain.write(cert.read()) + chain = client.conf_get('/certificates/all/chain') + assert len(chain) == chain_length - 1, 'certificate chain length' - self.set_certificate_req_context() + add_tls(cert='all') - assert 'success' in self.certificate_load( - 'all', 'end' - ), 'certificate chain upload' + assert ( + client.get_ssl(context=context_cert_req())['status'] == 200 + ), 'certificate chain long' - chain = self.conf_get('/certificates/all/chain') - assert len(chain) == chain_length - 1, 'certificate chain length' - self.add_tls(cert='all') +def test_tls_certificate_empty_cn(): + client.certificate('root', False) - assert self.get_ssl()['status'] == 200, 'certificate chain long' + req(subject='/') - def test_tls_certificate_empty_cn(self): - self.certificate('root', False) + generate_ca_conf() + ca() - self.req(subject='/') + assert 'success' in client.certificate_load('localhost', 'localhost') - self.generate_ca_conf() - self.ca() + cert = client.conf_get('/certificates/localhost') + assert cert['chain'][0]['subject'] == {}, 'empty subject' + assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' - self.set_certificate_req_context() - assert 'success' in self.certificate_load('localhost', 'localhost') +def test_tls_certificate_empty_cn_san(): + client.certificate('root', False) - cert = self.conf_get('/certificates/localhost') - assert cert['chain'][0]['subject'] == {}, 'empty subject' - assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' + client.openssl_conf( + rewrite=True, alt_names=["example.com", "www.example.net"] + ) - def test_tls_certificate_empty_cn_san(self): - self.certificate('root', False) + req(subject='/') - self.openssl_conf( - rewrite=True, alt_names=["example.com", "www.example.net"] - ) + generate_ca_conf() + ca() - self.req(subject='/') + assert 'success' in client.certificate_load('localhost', 'localhost') - self.generate_ca_conf() - self.ca() + cert = client.conf_get('/certificates/localhost') + assert cert['chain'][0]['subject'] == { + 'alt_names': ['example.com', 'www.example.net'] + }, 'subject alt_names' + assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' - self.set_certificate_req_context() - assert 'success' in self.certificate_load('localhost', 'localhost') +def test_tls_certificate_empty_cn_san_ip(): + client.certificate('root', False) - cert = self.conf_get('/certificates/localhost') - assert cert['chain'][0]['subject'] == { - 'alt_names': ['example.com', 'www.example.net'] - }, 'subject alt_names' - assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' + client.openssl_conf( + rewrite=True, + alt_names=['example.com', 'www.example.net', 'IP|10.0.0.1'], + ) - def test_tls_certificate_empty_cn_san_ip(self): - self.certificate('root', False) + req(subject='/') - self.openssl_conf( - rewrite=True, - alt_names=['example.com', 'www.example.net', 'IP|10.0.0.1'], - ) + generate_ca_conf() + ca() - self.req(subject='/') + assert 'success' in client.certificate_load('localhost', 'localhost') - self.generate_ca_conf() - self.ca() + cert = client.conf_get('/certificates/localhost') + assert cert['chain'][0]['subject'] == { + 'alt_names': ['example.com', 'www.example.net'] + }, 'subject alt_names' + assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' - self.set_certificate_req_context() - assert 'success' in self.certificate_load('localhost', 'localhost') +def test_tls_keepalive(): + client.load('mirror') - cert = self.conf_get('/certificates/localhost') - assert cert['chain'][0]['subject'] == { - 'alt_names': ['example.com', 'www.example.net'] - }, 'subject alt_names' - assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' + assert client.get()['status'] == 200, 'init' - def test_tls_keepalive(self): - self.load('mirror') + client.certificate() - assert self.get()['status'] == 200, 'init' + add_tls(application='mirror') - self.certificate() + (resp, sock) = client.post_ssl( + headers={ + 'Host': 'localhost', + 'Connection': 'keep-alive', + }, + start=True, + body='0123456789', + read_timeout=1, + ) - self.add_tls(application='mirror') + assert resp['body'] == '0123456789', 'keepalive 1' - (resp, sock) = self.post_ssl( - headers={ - 'Host': 'localhost', - 'Connection': 'keep-alive', - }, - start=True, - body='0123456789', - read_timeout=1, - ) + resp = client.post_ssl( + headers={ + 'Host': 'localhost', + 'Connection': 'close', + }, + sock=sock, + body='0123456789', + ) - assert resp['body'] == '0123456789', 'keepalive 1' + assert resp['body'] == '0123456789', 'keepalive 2' - resp = self.post_ssl( - headers={ - 'Host': 'localhost', - 'Connection': 'close', + +def test_tls_no_close_notify(): + client.certificate() + + assert 'success' in client.conf( + { + "listeners": { + "*:7080": { + "pass": "routes", + "tls": {"certificate": "default"}, + } }, - sock=sock, - body='0123456789', - ) + "routes": [{"action": {"return": 200}}], + "applications": {}, + } + ), 'load application configuration' - assert resp['body'] == '0123456789', 'keepalive 2' - - def test_tls_no_close_notify(self): - self.certificate() - - assert 'success' in self.conf( - { - "listeners": { - "*:7080": { - "pass": "routes", - "tls": {"certificate": "default"}, - } - }, - "routes": [{"action": {"return": 200}}], - "applications": {}, - } - ), 'load application configuration' + (_, sock) = client.get_ssl(start=True) - (_, sock) = self.get_ssl(start=True) + time.sleep(5) - time.sleep(5) + sock.close() - sock.close() - @pytest.mark.skip('not yet') - def test_tls_keepalive_certificate_remove(self): - self.load('empty') +@pytest.mark.skip('not yet') +def test_tls_keepalive_certificate_remove(): + client.load('empty') - assert self.get()['status'] == 200, 'init' + assert client.get()['status'] == 200, 'init' - self.certificate() + client.certificate() - self.add_tls() + add_tls() - (resp, sock) = self.get_ssl( - headers={'Host': 'localhost', 'Connection': 'keep-alive'}, - start=True, - read_timeout=1, - ) + (resp, sock) = client.get_ssl( + headers={'Host': 'localhost', 'Connection': 'keep-alive'}, + start=True, + read_timeout=1, + ) - assert 'success' in self.conf( - {"pass": "applications/empty"}, 'listeners/*:7080' - ) - assert 'success' in self.conf_delete('/certificates/default') + assert 'success' in client.conf( + {"pass": "applications/empty"}, 'listeners/*:7080' + ) + assert 'success' in client.conf_delete('/certificates/default') - try: - resp = self.get_ssl(sock=sock) + try: + resp = client.get_ssl(sock=sock) - except KeyboardInterrupt: - raise + except KeyboardInterrupt: + raise - except: - resp = None + except: + resp = None - assert resp is None, 'keepalive remove certificate' + assert resp is None, 'keepalive remove certificate' - @pytest.mark.skip('not yet') - def test_tls_certificates_remove_all(self): - self.load('empty') - self.certificate() +@pytest.mark.skip('not yet') +def test_tls_certificates_remove_all(): + client.load('empty') - assert 'success' in self.conf_delete( - '/certificates' - ), 'remove all certificates' + client.certificate() - def test_tls_application_respawn( - self, findall, skip_alert, wait_for_record - ): - self.load('mirror') + assert 'success' in client.conf_delete( + '/certificates' + ), 'remove all certificates' - self.certificate() - assert 'success' in self.conf('1', 'applications/mirror/processes') +def test_tls_application_respawn(findall, skip_alert, wait_for_record): + client.load('mirror') - self.add_tls(application='mirror') + client.certificate() - (_, sock) = self.post_ssl( - headers={ - 'Host': 'localhost', - 'Connection': 'keep-alive', - }, - start=True, - body='0123456789', - read_timeout=1, - ) + assert 'success' in client.conf('1', 'applications/mirror/processes') - app_id = findall(r'(\d+)#\d+ "mirror" application started')[0] + add_tls(application='mirror') - subprocess.check_output(['kill', '-9', app_id]) + (_, sock) = client.post_ssl( + headers={ + 'Host': 'localhost', + 'Connection': 'keep-alive', + }, + start=True, + body='0123456789', + read_timeout=1, + ) - skip_alert(fr'process {app_id} exited on signal 9') + app_id = findall(r'(\d+)#\d+ "mirror" application started')[0] - wait_for_record( - fr' (?!{app_id}#)(\d+)#\d+ "mirror" application started' - ) + subprocess.check_output(['kill', '-9', app_id]) - resp = self.post_ssl(sock=sock, body='0123456789') + skip_alert(fr'process {app_id} exited on signal 9') - assert resp['status'] == 200, 'application respawn status' - assert resp['body'] == '0123456789', 'application respawn body' + wait_for_record(fr' (?!{app_id}#)(\d+)#\d+ "mirror" application started') - def test_tls_url_scheme(self): - self.load('variables') + resp = client.post_ssl(sock=sock, body='0123456789') - assert ( - self.post( - headers={ - 'Host': 'localhost', - 'Content-Type': 'text/html', - 'Custom-Header': '', - 'Connection': 'close', - } - )['headers']['Wsgi-Url-Scheme'] - == 'http' - ), 'url scheme http' + assert resp['status'] == 200, 'application respawn status' + assert resp['body'] == '0123456789', 'application respawn body' - self.certificate() - self.add_tls(application='variables') +def test_tls_url_scheme(): + client.load('variables') - assert ( - self.post_ssl( - headers={ - 'Host': 'localhost', - 'Content-Type': 'text/html', - 'Custom-Header': '', - 'Connection': 'close', - } - )['headers']['Wsgi-Url-Scheme'] - == 'https' - ), 'url scheme https' + assert ( + client.post( + headers={ + 'Host': 'localhost', + 'Content-Type': 'text/html', + 'Custom-Header': '', + 'Connection': 'close', + } + )['headers']['Wsgi-Url-Scheme'] + == 'http' + ), 'url scheme http' - def test_tls_big_upload(self): - self.load('upload') + client.certificate() - self.certificate() + add_tls(application='variables') - self.add_tls(application='upload') + assert ( + client.post_ssl( + headers={ + 'Host': 'localhost', + 'Content-Type': 'text/html', + 'Custom-Header': '', + 'Connection': 'close', + } + )['headers']['Wsgi-Url-Scheme'] + == 'https' + ), 'url scheme https' - filename = 'test.txt' - data = '0123456789' * 9000 - res = self.post_ssl( - body={ - 'file': { - 'filename': filename, - 'type': 'text/plain', - 'data': io.StringIO(data), - } +def test_tls_big_upload(): + client.load('upload') + + client.certificate() + + add_tls(application='upload') + + filename = 'test.txt' + data = '0123456789' * 9000 + + res = client.post_ssl( + body={ + 'file': { + 'filename': filename, + 'type': 'text/plain', + 'data': io.StringIO(data), } - ) - assert res['status'] == 200, 'status ok' - assert res['body'] == f'{filename}{data}' + } + ) + assert res['status'] == 200, 'status ok' + assert res['body'] == f'{filename}{data}' + - def test_tls_multi_listener(self): - self.load('empty') +def test_tls_multi_listener(): + client.load('empty') - self.certificate() + client.certificate() - self.add_tls() - self.add_tls(port=7081) + add_tls() + add_tls(port=7081) - assert self.get_ssl()['status'] == 200, 'listener #1' + assert client.get_ssl()['status'] == 200, 'listener #1' - assert self.get_ssl(port=7081)['status'] == 200, 'listener #2' + assert client.get_ssl(port=7081)['status'] == 200, 'listener #2' |