diff options
Diffstat (limited to 'test/test_tls.py')
-rw-r--r-- | test/test_tls.py | 1019 |
1 files changed, 520 insertions, 499 deletions
diff --git a/test/test_tls.py b/test/test_tls.py index 06c38d0b..54fdb665 100644 --- a/test/test_tls.py +++ b/test/test_tls.py @@ -4,54 +4,58 @@ import subprocess import time import pytest -from unit.applications.tls import TestApplicationTLS +from unit.applications.tls import ApplicationTLS from unit.option import option - -class TestTLS(TestApplicationTLS): - prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}} - - def openssl_date_to_sec_epoch(self, date): - return self.date_to_sec_epoch(date, '%b %d %X %Y %Z') - - def add_tls(self, application='empty', cert='default', port=7080): - assert 'success' in self.conf( - { - "pass": f"applications/{application}", - "tls": {"certificate": cert}, - }, - f'listeners/*:{port}', - ) - - def remove_tls(self, application='empty', port=7080): - assert 'success' in self.conf( - {"pass": f"applications/{application}"}, f'listeners/*:{port}' - ) - - def req(self, name='localhost', subject=None, x509=False): - subj = subject if subject is not None else f'/CN={name}/' - - subprocess.check_output( - [ - 'openssl', - 'req', - '-new', - '-subj', - subj, - '-config', - f'{option.temp_dir}/openssl.conf', - '-out', - f'{option.temp_dir}/{name}.csr', - '-keyout', - f'{option.temp_dir}/{name}.key', - ], - stderr=subprocess.STDOUT, - ) - - def generate_ca_conf(self): - with open(f'{option.temp_dir}/ca.conf', 'w') as f: - f.write( - f"""[ ca ] +prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}} + +client = ApplicationTLS() + + +def add_tls(application='empty', cert='default', port=7080): + assert 'success' in client.conf( + { + "pass": f"applications/{application}", + "tls": {"certificate": cert}, + }, + f'listeners/*:{port}', + ) + + +def ca(cert='root', out='localhost'): + subprocess.check_output( + [ + 'openssl', + 'ca', + '-batch', + '-config', + f'{option.temp_dir}/ca.conf', + '-keyfile', + f'{option.temp_dir}/{cert}.key', + '-cert', + f'{option.temp_dir}/{cert}.crt', + '-in', + f'{option.temp_dir}/{out}.csr', + '-out', + f'{option.temp_dir}/{out}.crt', + ], + stderr=subprocess.STDOUT, + ) + + +def context_cert_req(cert='root'): + context = ssl.create_default_context() + context.check_hostname = False + context.verify_mode = ssl.CERT_REQUIRED + context.load_verify_locations(f'{option.temp_dir}/{cert}.crt') + + return context + + +def generate_ca_conf(): + with open(f'{option.temp_dir}/ca.conf', 'w') as f: + f.write( + f"""[ ca ] default_ca = myca [ myca ] @@ -69,615 +73,632 @@ commonName = optional [ myca_extensions ] basicConstraints = critical,CA:TRUE""" - ) - - with open(f'{option.temp_dir}/certserial', 'w') as f: - f.write('1000') - - with open(f'{option.temp_dir}/certindex', 'w') as f: - f.write('') - - with open(f'{option.temp_dir}/certindex.attr', 'w') as f: - f.write('') - - def ca(self, cert='root', out='localhost'): - subprocess.check_output( - [ - 'openssl', - 'ca', - '-batch', - '-config', - f'{option.temp_dir}/ca.conf', - '-keyfile', - f'{option.temp_dir}/{cert}.key', - '-cert', - f'{option.temp_dir}/{cert}.crt', - '-in', - f'{option.temp_dir}/{out}.csr', - '-out', - f'{option.temp_dir}/{out}.crt', - ], - stderr=subprocess.STDOUT, ) - def set_certificate_req_context(self, cert='root'): - self.context = ssl.create_default_context() - self.context.check_hostname = False - self.context.verify_mode = ssl.CERT_REQUIRED - self.context.load_verify_locations(f'{option.temp_dir}/{cert}.crt') + with open(f'{option.temp_dir}/certserial', 'w') as f: + f.write('1000') - def test_tls_listener_option_add(self): - self.load('empty') + with open(f'{option.temp_dir}/certindex', 'w') as f: + f.write('') - self.certificate() + with open(f'{option.temp_dir}/certindex.attr', 'w') as f: + f.write('') - self.add_tls() - assert self.get_ssl()['status'] == 200, 'add listener option' +def remove_tls(application='empty', port=7080): + assert 'success' in client.conf( + {"pass": f"applications/{application}"}, f'listeners/*:{port}' + ) - def test_tls_listener_option_remove(self): - self.load('empty') - self.certificate() +def req(name='localhost', subject=None): + subj = subject if subject is not None else f'/CN={name}/' - self.add_tls() + subprocess.check_output( + [ + 'openssl', + 'req', + '-new', + '-subj', + subj, + '-config', + f'{option.temp_dir}/openssl.conf', + '-out', + f'{option.temp_dir}/{name}.csr', + '-keyout', + f'{option.temp_dir}/{name}.key', + ], + stderr=subprocess.STDOUT, + ) - self.get_ssl() - self.remove_tls() +def test_tls_listener_option_add(): + client.load('empty') - assert self.get()['status'] == 200, 'remove listener option' + client.certificate() - def test_tls_certificate_remove(self): - self.load('empty') + add_tls() - self.certificate() + assert client.get_ssl()['status'] == 200, 'add listener option' - assert 'success' in self.conf_delete( - '/certificates/default' - ), 'remove certificate' - def test_tls_certificate_remove_used(self): - self.load('empty') +def test_tls_listener_option_remove(): + client.load('empty') - self.certificate() + client.certificate() - self.add_tls() + add_tls() - assert 'error' in self.conf_delete( - '/certificates/default' - ), 'remove certificate' + client.get_ssl() - def test_tls_certificate_remove_nonexisting(self): - self.load('empty') + remove_tls() - self.certificate() + assert client.get()['status'] == 200, 'remove listener option' - self.add_tls() - assert 'error' in self.conf_delete( - '/certificates/blah' - ), 'remove nonexistings certificate' +def test_tls_certificate_remove(): + client.load('empty') - @pytest.mark.skip('not yet') - def test_tls_certificate_update(self): - self.load('empty') + client.certificate() - self.certificate() + assert 'success' in client.conf_delete( + '/certificates/default' + ), 'remove certificate' - self.add_tls() - cert_old = ssl.get_server_certificate(('127.0.0.1', 7080)) +def test_tls_certificate_remove_used(): + client.load('empty') - self.certificate() + client.certificate() - assert cert_old != ssl.get_server_certificate( - ('127.0.0.1', 7080) - ), 'update certificate' + add_tls() - @pytest.mark.skip('not yet') - def test_tls_certificate_key_incorrect(self): - self.load('empty') + assert 'error' in client.conf_delete( + '/certificates/default' + ), 'remove certificate' - self.certificate('first', False) - self.certificate('second', False) - assert 'error' in self.certificate_load( - 'first', 'second' - ), 'key incorrect' +def test_tls_certificate_remove_nonexisting(): + client.load('empty') - def test_tls_certificate_change(self): - self.load('empty') + client.certificate() - self.certificate() - self.certificate('new') + add_tls() - self.add_tls() + assert 'error' in client.conf_delete( + '/certificates/blah' + ), 'remove nonexistings certificate' - cert_old = ssl.get_server_certificate(('127.0.0.1', 7080)) - self.add_tls(cert='new') +@pytest.mark.skip('not yet') +def test_tls_certificate_update(): + client.load('empty') - assert cert_old != ssl.get_server_certificate( - ('127.0.0.1', 7080) - ), 'change certificate' + client.certificate() - def test_tls_certificate_key_rsa(self): - self.load('empty') + add_tls() - self.certificate() + cert_old = ssl.get_server_certificate(('127.0.0.1', 7080)) - assert ( - self.conf_get('/certificates/default/key') == 'RSA (2048 bits)' - ), 'certificate key rsa' + client.certificate() - def test_tls_certificate_key_ec(self, temp_dir): - self.load('empty') + assert cert_old != ssl.get_server_certificate( + ('127.0.0.1', 7080) + ), 'update certificate' - self.openssl_conf() - subprocess.check_output( - [ - 'openssl', - 'ecparam', - '-noout', - '-genkey', - '-out', - f'{temp_dir}/ec.key', - '-name', - 'prime256v1', - ], - stderr=subprocess.STDOUT, - ) +@pytest.mark.skip('not yet') +def test_tls_certificate_key_incorrect(): + client.load('empty') - subprocess.check_output( - [ - 'openssl', - 'req', - '-x509', - '-new', - '-subj', - '/CN=ec/', - '-config', - f'{temp_dir}/openssl.conf', - '-key', - f'{temp_dir}/ec.key', - '-out', - f'{temp_dir}/ec.crt', - ], - stderr=subprocess.STDOUT, - ) + client.certificate('first', False) + client.certificate('second', False) - self.certificate_load('ec') + assert 'error' in client.certificate_load( + 'first', 'second' + ), 'key incorrect' - assert ( - self.conf_get('/certificates/ec/key') == 'ECDH' - ), 'certificate key ec' - def test_tls_certificate_chain_options(self): - self.load('empty') +def test_tls_certificate_change(): + client.load('empty') - self.certificate() + client.certificate() + client.certificate('new') - chain = self.conf_get('/certificates/default/chain') + add_tls() - assert len(chain) == 1, 'certificate chain length' + cert_old = ssl.get_server_certificate(('127.0.0.1', 7080)) - cert = chain[0] + add_tls(cert='new') - assert ( - cert['subject']['common_name'] == 'default' - ), 'certificate subject common name' - assert ( - cert['issuer']['common_name'] == 'default' - ), 'certificate issuer common name' + assert cert_old != ssl.get_server_certificate( + ('127.0.0.1', 7080) + ), 'change certificate' - assert ( - abs( - self.sec_epoch() - - self.openssl_date_to_sec_epoch(cert['validity']['since']) - ) - < 60 - ), 'certificate validity since' - assert ( - self.openssl_date_to_sec_epoch(cert['validity']['until']) - - self.openssl_date_to_sec_epoch(cert['validity']['since']) - == 2592000 - ), 'certificate validity until' - def test_tls_certificate_chain(self, temp_dir): - self.load('empty') +def test_tls_certificate_key_rsa(): + client.load('empty') - self.certificate('root', False) + client.certificate() - self.req('int') - self.req('end') + assert ( + client.conf_get('/certificates/default/key') == 'RSA (2048 bits)' + ), 'certificate key rsa' - self.generate_ca_conf() - self.ca(cert='root', out='int') - self.ca(cert='int', out='end') +def test_tls_certificate_key_ec(temp_dir): + client.load('empty') - crt_path = f'{temp_dir}/end-int.crt' - end_path = f'{temp_dir}/end.crt' - int_path = f'{temp_dir}/int.crt' + client.openssl_conf() - with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open( - int_path, 'rb' - ) as int: - crt.write(end.read() + int.read()) - - self.set_certificate_req_context() - - # incomplete chain - - assert 'success' in self.certificate_load( - 'end', 'end' - ), 'certificate chain end upload' + subprocess.check_output( + [ + 'openssl', + 'ecparam', + '-noout', + '-genkey', + '-out', + f'{temp_dir}/ec.key', + '-name', + 'prime256v1', + ], + stderr=subprocess.STDOUT, + ) - chain = self.conf_get('/certificates/end/chain') - assert len(chain) == 1, 'certificate chain end length' - assert ( - chain[0]['subject']['common_name'] == 'end' - ), 'certificate chain end subject common name' - assert ( - chain[0]['issuer']['common_name'] == 'int' - ), 'certificate chain end issuer common name' + subprocess.check_output( + [ + 'openssl', + 'req', + '-x509', + '-new', + '-subj', + '/CN=ec/', + '-config', + f'{temp_dir}/openssl.conf', + '-key', + f'{temp_dir}/ec.key', + '-out', + f'{temp_dir}/ec.crt', + ], + stderr=subprocess.STDOUT, + ) - self.add_tls(cert='end') + client.certificate_load('ec') - try: - resp = self.get_ssl() - except ssl.SSLError: - resp = None + assert ( + client.conf_get('/certificates/ec/key') == 'ECDH' + ), 'certificate key ec' - assert resp == None, 'certificate chain incomplete chain' - # intermediate +def test_tls_certificate_chain_options(date_to_sec_epoch, sec_epoch): + client.load('empty') + date_format = '%b %d %X %Y %Z' - assert 'success' in self.certificate_load( - 'int', 'int' - ), 'certificate chain int upload' + client.certificate() - chain = self.conf_get('/certificates/int/chain') - assert len(chain) == 1, 'certificate chain int length' - assert ( - chain[0]['subject']['common_name'] == 'int' - ), 'certificate chain int subject common name' - assert ( - chain[0]['issuer']['common_name'] == 'root' - ), 'certificate chain int issuer common name' + chain = client.conf_get('/certificates/default/chain') - self.add_tls(cert='int') + assert len(chain) == 1, 'certificate chain length' - assert self.get_ssl()['status'] == 200, 'certificate chain intermediate' + cert = chain[0] - # intermediate server + assert ( + cert['subject']['common_name'] == 'default' + ), 'certificate subject common name' + assert ( + cert['issuer']['common_name'] == 'default' + ), 'certificate issuer common name' - assert 'success' in self.certificate_load( - 'end-int', 'end' - ), 'certificate chain end-int upload' + assert ( + abs( + sec_epoch + - date_to_sec_epoch(cert['validity']['since'], date_format) + ) + < 60 + ), 'certificate validity since' + assert ( + date_to_sec_epoch(cert['validity']['until'], date_format) + - date_to_sec_epoch(cert['validity']['since'], date_format) + == 2592000 + ), 'certificate validity until' - chain = self.conf_get('/certificates/end-int/chain') - assert len(chain) == 2, 'certificate chain end-int length' - assert ( - chain[0]['subject']['common_name'] == 'end' - ), 'certificate chain end-int int subject common name' - assert ( - chain[0]['issuer']['common_name'] == 'int' - ), 'certificate chain end-int int issuer common name' - assert ( - chain[1]['subject']['common_name'] == 'int' - ), 'certificate chain end-int end subject common name' - assert ( - chain[1]['issuer']['common_name'] == 'root' - ), 'certificate chain end-int end issuer common name' - self.add_tls(cert='end-int') +def test_tls_certificate_chain(temp_dir): + client.load('empty') - assert ( - self.get_ssl()['status'] == 200 - ), 'certificate chain intermediate server' + client.certificate('root', False) - def test_tls_certificate_chain_long(self, temp_dir): - self.load('empty') + req('int') + req('end') - self.generate_ca_conf() + generate_ca_conf() - # Minimum chain length is 3. - chain_length = 10 + ca(cert='root', out='int') + ca(cert='int', out='end') - for i in range(chain_length): - if i == 0: - self.certificate('root', False) - elif i == chain_length - 1: - self.req('end') - else: - self.req(f'int{i}') + crt_path = f'{temp_dir}/end-int.crt' + end_path = f'{temp_dir}/end.crt' + int_path = f'{temp_dir}/int.crt' + + with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open( + int_path, 'rb' + ) as int: + crt.write(end.read() + int.read()) + + # incomplete chain + + assert 'success' in client.certificate_load( + 'end', 'end' + ), 'certificate chain end upload' + + chain = client.conf_get('/certificates/end/chain') + assert len(chain) == 1, 'certificate chain end length' + assert ( + chain[0]['subject']['common_name'] == 'end' + ), 'certificate chain end subject common name' + assert ( + chain[0]['issuer']['common_name'] == 'int' + ), 'certificate chain end issuer common name' + + add_tls(cert='end') + + ctx_cert_req = context_cert_req() + try: + resp = client.get_ssl(context=ctx_cert_req) + except ssl.SSLError: + resp = None + + assert resp is None, 'certificate chain incomplete chain' + + # intermediate + + assert 'success' in client.certificate_load( + 'int', 'int' + ), 'certificate chain int upload' + + chain = client.conf_get('/certificates/int/chain') + assert len(chain) == 1, 'certificate chain int length' + assert ( + chain[0]['subject']['common_name'] == 'int' + ), 'certificate chain int subject common name' + assert ( + chain[0]['issuer']['common_name'] == 'root' + ), 'certificate chain int issuer common name' + + add_tls(cert='int') + + assert client.get_ssl()['status'] == 200, 'certificate chain intermediate' + + # intermediate server + + assert 'success' in client.certificate_load( + 'end-int', 'end' + ), 'certificate chain end-int upload' + + chain = client.conf_get('/certificates/end-int/chain') + assert len(chain) == 2, 'certificate chain end-int length' + assert ( + chain[0]['subject']['common_name'] == 'end' + ), 'certificate chain end-int int subject common name' + assert ( + chain[0]['issuer']['common_name'] == 'int' + ), 'certificate chain end-int int issuer common name' + assert ( + chain[1]['subject']['common_name'] == 'int' + ), 'certificate chain end-int end subject common name' + assert ( + chain[1]['issuer']['common_name'] == 'root' + ), 'certificate chain end-int end issuer common name' + + add_tls(cert='end-int') + + assert ( + client.get_ssl(context=ctx_cert_req)['status'] == 200 + ), 'certificate chain intermediate server' + + +def test_tls_certificate_chain_long(temp_dir): + client.load('empty') - for i in range(chain_length - 1): - if i == 0: - self.ca(cert='root', out='int1') - elif i == chain_length - 2: - self.ca(cert=f'int{(chain_length - 2)}', out='end') - else: - self.ca(cert=f'int{i}', out=f'int{(i + 1)}') + generate_ca_conf() - for i in range(chain_length - 1, 0, -1): - path = ( - f'{temp_dir}/end.crt' - if i == chain_length - 1 - else f'{temp_dir}/int{i}.crt' - ) + # Minimum chain length is 3. + chain_length = 10 + + for i in range(chain_length): + if i == 0: + client.certificate('root', False) + elif i == chain_length - 1: + req('end') + else: + req(f'int{i}') + + for i in range(chain_length - 1): + if i == 0: + ca(cert='root', out='int1') + elif i == chain_length - 2: + ca(cert=f'int{(chain_length - 2)}', out='end') + else: + ca(cert=f'int{i}', out=f'int{(i + 1)}') + + for i in range(chain_length - 1, 0, -1): + path = ( + f'{temp_dir}/end.crt' + if i == chain_length - 1 + else f'{temp_dir}/int{i}.crt' + ) - with open(f'{temp_dir}/all.crt', 'a') as chain, open(path) as cert: - chain.write(cert.read()) + with open(f'{temp_dir}/all.crt', 'a') as chain, open(path) as cert: + chain.write(cert.read()) - self.set_certificate_req_context() + assert 'success' in client.certificate_load( + 'all', 'end' + ), 'certificate chain upload' - assert 'success' in self.certificate_load( - 'all', 'end' - ), 'certificate chain upload' + chain = client.conf_get('/certificates/all/chain') + assert len(chain) == chain_length - 1, 'certificate chain length' - chain = self.conf_get('/certificates/all/chain') - assert len(chain) == chain_length - 1, 'certificate chain length' + add_tls(cert='all') - self.add_tls(cert='all') + assert ( + client.get_ssl(context=context_cert_req())['status'] == 200 + ), 'certificate chain long' - assert self.get_ssl()['status'] == 200, 'certificate chain long' - def test_tls_certificate_empty_cn(self, temp_dir): - self.certificate('root', False) +def test_tls_certificate_empty_cn(): + client.certificate('root', False) - self.req(subject='/') + req(subject='/') - self.generate_ca_conf() - self.ca() + generate_ca_conf() + ca() - self.set_certificate_req_context() + assert 'success' in client.certificate_load('localhost', 'localhost') - assert 'success' in self.certificate_load('localhost', 'localhost') + cert = client.conf_get('/certificates/localhost') + assert cert['chain'][0]['subject'] == {}, 'empty subject' + assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' - cert = self.conf_get('/certificates/localhost') - assert cert['chain'][0]['subject'] == {}, 'empty subject' - assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' - def test_tls_certificate_empty_cn_san(self, temp_dir): - self.certificate('root', False) +def test_tls_certificate_empty_cn_san(): + client.certificate('root', False) - self.openssl_conf( - rewrite=True, alt_names=["example.com", "www.example.net"] - ) + client.openssl_conf( + rewrite=True, alt_names=["example.com", "www.example.net"] + ) - self.req(subject='/') + req(subject='/') - self.generate_ca_conf() - self.ca() + generate_ca_conf() + ca() - self.set_certificate_req_context() + assert 'success' in client.certificate_load('localhost', 'localhost') - assert 'success' in self.certificate_load('localhost', 'localhost') + cert = client.conf_get('/certificates/localhost') + assert cert['chain'][0]['subject'] == { + 'alt_names': ['example.com', 'www.example.net'] + }, 'subject alt_names' + assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' - cert = self.conf_get('/certificates/localhost') - assert cert['chain'][0]['subject'] == { - 'alt_names': ['example.com', 'www.example.net'] - }, 'subject alt_names' - assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' - def test_tls_certificate_empty_cn_san_ip(self): - self.certificate('root', False) +def test_tls_certificate_empty_cn_san_ip(): + client.certificate('root', False) - self.openssl_conf( - rewrite=True, - alt_names=['example.com', 'www.example.net', 'IP|10.0.0.1'], - ) + client.openssl_conf( + rewrite=True, + alt_names=['example.com', 'www.example.net', 'IP|10.0.0.1'], + ) - self.req(subject='/') + req(subject='/') - self.generate_ca_conf() - self.ca() + generate_ca_conf() + ca() - self.set_certificate_req_context() + assert 'success' in client.certificate_load('localhost', 'localhost') - assert 'success' in self.certificate_load('localhost', 'localhost') + cert = client.conf_get('/certificates/localhost') + assert cert['chain'][0]['subject'] == { + 'alt_names': ['example.com', 'www.example.net'] + }, 'subject alt_names' + assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' - cert = self.conf_get('/certificates/localhost') - assert cert['chain'][0]['subject'] == { - 'alt_names': ['example.com', 'www.example.net'] - }, 'subject alt_names' - assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer' - def test_tls_keepalive(self): - self.load('mirror') +def test_tls_keepalive(): + client.load('mirror') - assert self.get()['status'] == 200, 'init' + assert client.get()['status'] == 200, 'init' - self.certificate() + client.certificate() - self.add_tls(application='mirror') + add_tls(application='mirror') - (resp, sock) = self.post_ssl( - headers={ - 'Host': 'localhost', - 'Connection': 'keep-alive', - }, - start=True, - body='0123456789', - read_timeout=1, - ) + (resp, sock) = client.post_ssl( + headers={ + 'Host': 'localhost', + 'Connection': 'keep-alive', + }, + start=True, + body='0123456789', + read_timeout=1, + ) - assert resp['body'] == '0123456789', 'keepalive 1' + assert resp['body'] == '0123456789', 'keepalive 1' - resp = self.post_ssl( - headers={ - 'Host': 'localhost', - 'Connection': 'close', + resp = client.post_ssl( + headers={ + 'Host': 'localhost', + 'Connection': 'close', + }, + sock=sock, + body='0123456789', + ) + + assert resp['body'] == '0123456789', 'keepalive 2' + + +def test_tls_no_close_notify(): + client.certificate() + + assert 'success' in client.conf( + { + "listeners": { + "*:7080": { + "pass": "routes", + "tls": {"certificate": "default"}, + } }, - sock=sock, - body='0123456789', - ) + "routes": [{"action": {"return": 200}}], + "applications": {}, + } + ), 'load application configuration' - assert resp['body'] == '0123456789', 'keepalive 2' - - def test_tls_no_close_notify(self): - self.certificate() - - assert 'success' in self.conf( - { - "listeners": { - "*:7080": { - "pass": "routes", - "tls": {"certificate": "default"}, - } - }, - "routes": [{"action": {"return": 200}}], - "applications": {}, - } - ), 'load application configuration' + (_, sock) = client.get_ssl(start=True) - (resp, sock) = self.get_ssl(start=True) + time.sleep(5) - time.sleep(5) + sock.close() - sock.close() - @pytest.mark.skip('not yet') - def test_tls_keepalive_certificate_remove(self): - self.load('empty') +@pytest.mark.skip('not yet') +def test_tls_keepalive_certificate_remove(): + client.load('empty') - assert self.get()['status'] == 200, 'init' + assert client.get()['status'] == 200, 'init' - self.certificate() + client.certificate() - self.add_tls() + add_tls() - (resp, sock) = self.get_ssl( - headers={'Host': 'localhost', 'Connection': 'keep-alive'}, - start=True, - read_timeout=1, - ) + (resp, sock) = client.get_ssl( + headers={'Host': 'localhost', 'Connection': 'keep-alive'}, + start=True, + read_timeout=1, + ) - assert 'success' in self.conf( - {"pass": "applications/empty"}, 'listeners/*:7080' - ) - assert 'success' in self.conf_delete('/certificates/default') + assert 'success' in client.conf( + {"pass": "applications/empty"}, 'listeners/*:7080' + ) + assert 'success' in client.conf_delete('/certificates/default') - try: - resp = self.get_ssl(sock=sock) + try: + resp = client.get_ssl(sock=sock) - except KeyboardInterrupt: - raise + except KeyboardInterrupt: + raise - except: - resp = None + except: + resp = None - assert resp == None, 'keepalive remove certificate' + assert resp is None, 'keepalive remove certificate' - @pytest.mark.skip('not yet') - def test_tls_certificates_remove_all(self): - self.load('empty') - self.certificate() +@pytest.mark.skip('not yet') +def test_tls_certificates_remove_all(): + client.load('empty') - assert 'success' in self.conf_delete( - '/certificates' - ), 'remove all certificates' + client.certificate() - def test_tls_application_respawn(self, skip_alert): - self.load('mirror') + assert 'success' in client.conf_delete( + '/certificates' + ), 'remove all certificates' - self.certificate() - assert 'success' in self.conf('1', 'applications/mirror/processes') +def test_tls_application_respawn(findall, skip_alert, wait_for_record): + client.load('mirror') - self.add_tls(application='mirror') + client.certificate() - (_, sock) = self.post_ssl( - headers={ - 'Host': 'localhost', - 'Connection': 'keep-alive', - }, - start=True, - body='0123456789', - read_timeout=1, - ) + assert 'success' in client.conf('1', 'applications/mirror/processes') - app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0] + add_tls(application='mirror') - subprocess.check_output(['kill', '-9', app_id]) + (_, sock) = client.post_ssl( + headers={ + 'Host': 'localhost', + 'Connection': 'keep-alive', + }, + start=True, + body='0123456789', + read_timeout=1, + ) - skip_alert(fr'process {app_id} exited on signal 9') + app_id = findall(r'(\d+)#\d+ "mirror" application started')[0] - self.wait_for_record( - fr' (?!{app_id}#)(\d+)#\d+ "mirror" application started' - ) + subprocess.check_output(['kill', '-9', app_id]) - resp = self.post_ssl(sock=sock, body='0123456789') + skip_alert(fr'process {app_id} exited on signal 9') - assert resp['status'] == 200, 'application respawn status' - assert resp['body'] == '0123456789', 'application respawn body' + wait_for_record(fr' (?!{app_id}#)(\d+)#\d+ "mirror" application started') - def test_tls_url_scheme(self): - self.load('variables') + resp = client.post_ssl(sock=sock, body='0123456789') - assert ( - self.post( - headers={ - 'Host': 'localhost', - 'Content-Type': 'text/html', - 'Custom-Header': '', - 'Connection': 'close', - } - )['headers']['Wsgi-Url-Scheme'] - == 'http' - ), 'url scheme http' + assert resp['status'] == 200, 'application respawn status' + assert resp['body'] == '0123456789', 'application respawn body' - self.certificate() - self.add_tls(application='variables') +def test_tls_url_scheme(): + client.load('variables') - assert ( - self.post_ssl( - headers={ - 'Host': 'localhost', - 'Content-Type': 'text/html', - 'Custom-Header': '', - 'Connection': 'close', - } - )['headers']['Wsgi-Url-Scheme'] - == 'https' - ), 'url scheme https' + assert ( + client.post( + headers={ + 'Host': 'localhost', + 'Content-Type': 'text/html', + 'Custom-Header': '', + 'Connection': 'close', + } + )['headers']['Wsgi-Url-Scheme'] + == 'http' + ), 'url scheme http' - def test_tls_big_upload(self): - self.load('upload') + client.certificate() - self.certificate() + add_tls(application='variables') - self.add_tls(application='upload') + assert ( + client.post_ssl( + headers={ + 'Host': 'localhost', + 'Content-Type': 'text/html', + 'Custom-Header': '', + 'Connection': 'close', + } + )['headers']['Wsgi-Url-Scheme'] + == 'https' + ), 'url scheme https' - filename = 'test.txt' - data = '0123456789' * 9000 - res = self.post_ssl( - body={ - 'file': { - 'filename': filename, - 'type': 'text/plain', - 'data': io.StringIO(data), - } +def test_tls_big_upload(): + client.load('upload') + + client.certificate() + + add_tls(application='upload') + + filename = 'test.txt' + data = '0123456789' * 9000 + + res = client.post_ssl( + body={ + 'file': { + 'filename': filename, + 'type': 'text/plain', + 'data': io.StringIO(data), } - ) - assert res['status'] == 200, 'status ok' - assert res['body'] == f'{filename}{data}' + } + ) + assert res['status'] == 200, 'status ok' + assert res['body'] == f'{filename}{data}' + - def test_tls_multi_listener(self): - self.load('empty') +def test_tls_multi_listener(): + client.load('empty') - self.certificate() + client.certificate() - self.add_tls() - self.add_tls(port=7081) + add_tls() + add_tls(port=7081) - assert self.get_ssl()['status'] == 200, 'listener #1' + assert client.get_ssl()['status'] == 200, 'listener #1' - assert self.get_ssl(port=7081)['status'] == 200, 'listener #2' + assert client.get_ssl(port=7081)['status'] == 200, 'listener #2' |