summaryrefslogtreecommitdiffhomepage
path: root/test/test_tls.py
diff options
context:
space:
mode:
authorKonstantin Pavlov <thresh@nginx.com>2023-05-10 10:29:16 -0700
committerKonstantin Pavlov <thresh@nginx.com>2023-05-10 10:29:16 -0700
commit69235c513277c64b513447d9b92c3c03d616f577 (patch)
tree0780c92ba28d92b547c85ea0bee5e3040e14dee2 /test/test_tls.py
parentb9bc222021e77bbdfb12576b3e315b962cf6b399 (diff)
parentfaf97dc06058de1c929af33a68adb34d3932b374 (diff)
downloadunit-1.30.0-1.tar.gz
unit-1.30.0-1.tar.bz2
Merged with the default branch.1.30.0-1
Diffstat (limited to '')
-rw-r--r--test/test_tls.py83
1 files changed, 39 insertions, 44 deletions
diff --git a/test/test_tls.py b/test/test_tls.py
index d4edcbd3..06c38d0b 100644
--- a/test/test_tls.py
+++ b/test/test_tls.py
@@ -17,19 +17,19 @@ class TestTLS(TestApplicationTLS):
def add_tls(self, application='empty', cert='default', port=7080):
assert 'success' in self.conf(
{
- "pass": "applications/" + application,
+ "pass": f"applications/{application}",
"tls": {"certificate": cert},
},
- 'listeners/*:' + str(port),
+ f'listeners/*:{port}',
)
def remove_tls(self, application='empty', port=7080):
assert 'success' in self.conf(
- {"pass": "applications/" + application}, 'listeners/*:' + str(port)
+ {"pass": f"applications/{application}"}, f'listeners/*:{port}'
)
def req(self, name='localhost', subject=None, x509=False):
- subj = subject if subject is not None else '/CN=' + name + '/'
+ subj = subject if subject is not None else f'/CN={name}/'
subprocess.check_output(
[
@@ -39,27 +39,27 @@ class TestTLS(TestApplicationTLS):
'-subj',
subj,
'-config',
- option.temp_dir + '/openssl.conf',
+ f'{option.temp_dir}/openssl.conf',
'-out',
- option.temp_dir + '/' + name + '.csr',
+ f'{option.temp_dir}/{name}.csr',
'-keyout',
- option.temp_dir + '/' + name + '.key',
+ f'{option.temp_dir}/{name}.key',
],
stderr=subprocess.STDOUT,
)
def generate_ca_conf(self):
- with open(option.temp_dir + '/ca.conf', 'w') as f:
+ with open(f'{option.temp_dir}/ca.conf', 'w') as f:
f.write(
- """[ ca ]
+ f"""[ ca ]
default_ca = myca
[ myca ]
-new_certs_dir = %(dir)s
-database = %(database)s
+new_certs_dir = {option.temp_dir}
+database = {option.temp_dir}/certindex
default_md = sha256
policy = myca_policy
-serial = %(certserial)s
+serial = {option.temp_dir}/certserial
default_days = 1
x509_extensions = myca_extensions
copy_extensions = copy
@@ -69,20 +69,15 @@ commonName = optional
[ myca_extensions ]
basicConstraints = critical,CA:TRUE"""
- % {
- 'dir': option.temp_dir,
- 'database': option.temp_dir + '/certindex',
- 'certserial': option.temp_dir + '/certserial',
- }
)
- with open(option.temp_dir + '/certserial', 'w') as f:
+ with open(f'{option.temp_dir}/certserial', 'w') as f:
f.write('1000')
- with open(option.temp_dir + '/certindex', 'w') as f:
+ with open(f'{option.temp_dir}/certindex', 'w') as f:
f.write('')
- with open(option.temp_dir + '/certindex.attr', 'w') as f:
+ with open(f'{option.temp_dir}/certindex.attr', 'w') as f:
f.write('')
def ca(self, cert='root', out='localhost'):
@@ -92,15 +87,15 @@ basicConstraints = critical,CA:TRUE"""
'ca',
'-batch',
'-config',
- option.temp_dir + '/ca.conf',
+ f'{option.temp_dir}/ca.conf',
'-keyfile',
- option.temp_dir + '/' + cert + '.key',
+ f'{option.temp_dir}/{cert}.key',
'-cert',
- option.temp_dir + '/' + cert + '.crt',
+ f'{option.temp_dir}/{cert}.crt',
'-in',
- option.temp_dir + '/' + out + '.csr',
+ f'{option.temp_dir}/{out}.csr',
'-out',
- option.temp_dir + '/' + out + '.crt',
+ f'{option.temp_dir}/{out}.crt',
],
stderr=subprocess.STDOUT,
)
@@ -109,9 +104,7 @@ basicConstraints = critical,CA:TRUE"""
self.context = ssl.create_default_context()
self.context.check_hostname = False
self.context.verify_mode = ssl.CERT_REQUIRED
- self.context.load_verify_locations(
- option.temp_dir + '/' + cert + '.crt'
- )
+ self.context.load_verify_locations(f'{option.temp_dir}/{cert}.crt')
def test_tls_listener_option_add(self):
self.load('empty')
@@ -230,7 +223,7 @@ basicConstraints = critical,CA:TRUE"""
'-noout',
'-genkey',
'-out',
- temp_dir + '/ec.key',
+ f'{temp_dir}/ec.key',
'-name',
'prime256v1',
],
@@ -246,11 +239,11 @@ basicConstraints = critical,CA:TRUE"""
'-subj',
'/CN=ec/',
'-config',
- temp_dir + '/openssl.conf',
+ f'{temp_dir}/openssl.conf',
'-key',
- temp_dir + '/ec.key',
+ f'{temp_dir}/ec.key',
'-out',
- temp_dir + '/ec.crt',
+ f'{temp_dir}/ec.crt',
],
stderr=subprocess.STDOUT,
)
@@ -305,9 +298,9 @@ basicConstraints = critical,CA:TRUE"""
self.ca(cert='root', out='int')
self.ca(cert='int', out='end')
- crt_path = temp_dir + '/end-int.crt'
- end_path = temp_dir + '/end.crt'
- int_path = temp_dir + '/int.crt'
+ crt_path = f'{temp_dir}/end-int.crt'
+ end_path = f'{temp_dir}/end.crt'
+ int_path = f'{temp_dir}/int.crt'
with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open(
int_path, 'rb'
@@ -400,22 +393,24 @@ basicConstraints = critical,CA:TRUE"""
elif i == chain_length - 1:
self.req('end')
else:
- self.req('int{}'.format(i))
+ self.req(f'int{i}')
for i in range(chain_length - 1):
if i == 0:
self.ca(cert='root', out='int1')
elif i == chain_length - 2:
- self.ca(cert='int{}'.format(chain_length - 2), out='end')
+ self.ca(cert=f'int{(chain_length - 2)}', out='end')
else:
- self.ca(cert='int{}'.format(i), out='int{}'.format(i + 1))
+ self.ca(cert=f'int{i}', out=f'int{(i + 1)}')
for i in range(chain_length - 1, 0, -1):
- path = temp_dir + (
- '/end.crt' if i == chain_length - 1 else '/int{}.crt'.format(i)
+ path = (
+ f'{temp_dir}/end.crt'
+ if i == chain_length - 1
+ else f'{temp_dir}/int{i}.crt'
)
- with open(temp_dir + '/all.crt', 'a') as chain, open(path) as cert:
+ with open(f'{temp_dir}/all.crt', 'a') as chain, open(path) as cert:
chain.write(cert.read())
self.set_certificate_req_context()
@@ -611,10 +606,10 @@ basicConstraints = critical,CA:TRUE"""
subprocess.check_output(['kill', '-9', app_id])
- skip_alert(r'process %s exited on signal 9' % app_id)
+ skip_alert(fr'process {app_id} exited on signal 9')
self.wait_for_record(
- r' (?!' + app_id + r'#)(\d+)#\d+ "mirror" application started'
+ fr' (?!{app_id}#)(\d+)#\d+ "mirror" application started'
)
resp = self.post_ssl(sock=sock, body='0123456789')
@@ -673,7 +668,7 @@ basicConstraints = critical,CA:TRUE"""
}
)
assert res['status'] == 200, 'status ok'
- assert res['body'] == filename + data
+ assert res['body'] == f'{filename}{data}'
def test_tls_multi_listener(self):
self.load('empty')