diff options
author | Konstantin Pavlov <thresh@nginx.com> | 2023-05-10 10:29:16 -0700 |
---|---|---|
committer | Konstantin Pavlov <thresh@nginx.com> | 2023-05-10 10:29:16 -0700 |
commit | 69235c513277c64b513447d9b92c3c03d616f577 (patch) | |
tree | 0780c92ba28d92b547c85ea0bee5e3040e14dee2 /test/test_tls.py | |
parent | b9bc222021e77bbdfb12576b3e315b962cf6b399 (diff) | |
parent | faf97dc06058de1c929af33a68adb34d3932b374 (diff) | |
download | unit-1.30.0-1.tar.gz unit-1.30.0-1.tar.bz2 |
Merged with the default branch.1.30.0-1
Diffstat (limited to '')
-rw-r--r-- | test/test_tls.py | 83 |
1 files changed, 39 insertions, 44 deletions
diff --git a/test/test_tls.py b/test/test_tls.py index d4edcbd3..06c38d0b 100644 --- a/test/test_tls.py +++ b/test/test_tls.py @@ -17,19 +17,19 @@ class TestTLS(TestApplicationTLS): def add_tls(self, application='empty', cert='default', port=7080): assert 'success' in self.conf( { - "pass": "applications/" + application, + "pass": f"applications/{application}", "tls": {"certificate": cert}, }, - 'listeners/*:' + str(port), + f'listeners/*:{port}', ) def remove_tls(self, application='empty', port=7080): assert 'success' in self.conf( - {"pass": "applications/" + application}, 'listeners/*:' + str(port) + {"pass": f"applications/{application}"}, f'listeners/*:{port}' ) def req(self, name='localhost', subject=None, x509=False): - subj = subject if subject is not None else '/CN=' + name + '/' + subj = subject if subject is not None else f'/CN={name}/' subprocess.check_output( [ @@ -39,27 +39,27 @@ class TestTLS(TestApplicationTLS): '-subj', subj, '-config', - option.temp_dir + '/openssl.conf', + f'{option.temp_dir}/openssl.conf', '-out', - option.temp_dir + '/' + name + '.csr', + f'{option.temp_dir}/{name}.csr', '-keyout', - option.temp_dir + '/' + name + '.key', + f'{option.temp_dir}/{name}.key', ], stderr=subprocess.STDOUT, ) def generate_ca_conf(self): - with open(option.temp_dir + '/ca.conf', 'w') as f: + with open(f'{option.temp_dir}/ca.conf', 'w') as f: f.write( - """[ ca ] + f"""[ ca ] default_ca = myca [ myca ] -new_certs_dir = %(dir)s -database = %(database)s +new_certs_dir = {option.temp_dir} +database = {option.temp_dir}/certindex default_md = sha256 policy = myca_policy -serial = %(certserial)s +serial = {option.temp_dir}/certserial default_days = 1 x509_extensions = myca_extensions copy_extensions = copy @@ -69,20 +69,15 @@ commonName = optional [ myca_extensions ] basicConstraints = critical,CA:TRUE""" - % { - 'dir': option.temp_dir, - 'database': option.temp_dir + '/certindex', - 'certserial': option.temp_dir + '/certserial', - } ) - with open(option.temp_dir + '/certserial', 'w') as f: + with open(f'{option.temp_dir}/certserial', 'w') as f: f.write('1000') - with open(option.temp_dir + '/certindex', 'w') as f: + with open(f'{option.temp_dir}/certindex', 'w') as f: f.write('') - with open(option.temp_dir + '/certindex.attr', 'w') as f: + with open(f'{option.temp_dir}/certindex.attr', 'w') as f: f.write('') def ca(self, cert='root', out='localhost'): @@ -92,15 +87,15 @@ basicConstraints = critical,CA:TRUE""" 'ca', '-batch', '-config', - option.temp_dir + '/ca.conf', + f'{option.temp_dir}/ca.conf', '-keyfile', - option.temp_dir + '/' + cert + '.key', + f'{option.temp_dir}/{cert}.key', '-cert', - option.temp_dir + '/' + cert + '.crt', + f'{option.temp_dir}/{cert}.crt', '-in', - option.temp_dir + '/' + out + '.csr', + f'{option.temp_dir}/{out}.csr', '-out', - option.temp_dir + '/' + out + '.crt', + f'{option.temp_dir}/{out}.crt', ], stderr=subprocess.STDOUT, ) @@ -109,9 +104,7 @@ basicConstraints = critical,CA:TRUE""" self.context = ssl.create_default_context() self.context.check_hostname = False self.context.verify_mode = ssl.CERT_REQUIRED - self.context.load_verify_locations( - option.temp_dir + '/' + cert + '.crt' - ) + self.context.load_verify_locations(f'{option.temp_dir}/{cert}.crt') def test_tls_listener_option_add(self): self.load('empty') @@ -230,7 +223,7 @@ basicConstraints = critical,CA:TRUE""" '-noout', '-genkey', '-out', - temp_dir + '/ec.key', + f'{temp_dir}/ec.key', '-name', 'prime256v1', ], @@ -246,11 +239,11 @@ basicConstraints = critical,CA:TRUE""" '-subj', '/CN=ec/', '-config', - temp_dir + '/openssl.conf', + f'{temp_dir}/openssl.conf', '-key', - temp_dir + '/ec.key', + f'{temp_dir}/ec.key', '-out', - temp_dir + '/ec.crt', + f'{temp_dir}/ec.crt', ], stderr=subprocess.STDOUT, ) @@ -305,9 +298,9 @@ basicConstraints = critical,CA:TRUE""" self.ca(cert='root', out='int') self.ca(cert='int', out='end') - crt_path = temp_dir + '/end-int.crt' - end_path = temp_dir + '/end.crt' - int_path = temp_dir + '/int.crt' + crt_path = f'{temp_dir}/end-int.crt' + end_path = f'{temp_dir}/end.crt' + int_path = f'{temp_dir}/int.crt' with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open( int_path, 'rb' @@ -400,22 +393,24 @@ basicConstraints = critical,CA:TRUE""" elif i == chain_length - 1: self.req('end') else: - self.req('int{}'.format(i)) + self.req(f'int{i}') for i in range(chain_length - 1): if i == 0: self.ca(cert='root', out='int1') elif i == chain_length - 2: - self.ca(cert='int{}'.format(chain_length - 2), out='end') + self.ca(cert=f'int{(chain_length - 2)}', out='end') else: - self.ca(cert='int{}'.format(i), out='int{}'.format(i + 1)) + self.ca(cert=f'int{i}', out=f'int{(i + 1)}') for i in range(chain_length - 1, 0, -1): - path = temp_dir + ( - '/end.crt' if i == chain_length - 1 else '/int{}.crt'.format(i) + path = ( + f'{temp_dir}/end.crt' + if i == chain_length - 1 + else f'{temp_dir}/int{i}.crt' ) - with open(temp_dir + '/all.crt', 'a') as chain, open(path) as cert: + with open(f'{temp_dir}/all.crt', 'a') as chain, open(path) as cert: chain.write(cert.read()) self.set_certificate_req_context() @@ -611,10 +606,10 @@ basicConstraints = critical,CA:TRUE""" subprocess.check_output(['kill', '-9', app_id]) - skip_alert(r'process %s exited on signal 9' % app_id) + skip_alert(fr'process {app_id} exited on signal 9') self.wait_for_record( - r' (?!' + app_id + r'#)(\d+)#\d+ "mirror" application started' + fr' (?!{app_id}#)(\d+)#\d+ "mirror" application started' ) resp = self.post_ssl(sock=sock, body='0123456789') @@ -673,7 +668,7 @@ basicConstraints = critical,CA:TRUE""" } ) assert res['status'] == 200, 'status ok' - assert res['body'] == filename + data + assert res['body'] == f'{filename}{data}' def test_tls_multi_listener(self): self.load('empty') |