summaryrefslogtreecommitdiffhomepage
path: root/test/test_tls.py
diff options
context:
space:
mode:
authorAndrei Zeliankou <zelenkov@nginx.com>2020-09-16 21:31:15 +0100
committerAndrei Zeliankou <zelenkov@nginx.com>2020-09-16 21:31:15 +0100
commitd5e915934066c77a59d211efafca10c117b73d05 (patch)
treef894a3c09bd8aa43e87276eed377eb09f97e46fe /test/test_tls.py
parent77ecb6ab49257dd662aa9c461fed3dc1d74e5092 (diff)
downloadunit-d5e915934066c77a59d211efafca10c117b73d05.tar.gz
unit-d5e915934066c77a59d211efafca10c117b73d05.tar.bz2
Tests: migrated to the pytest.
Diffstat (limited to 'test/test_tls.py')
-rw-r--r--test/test_tls.py398
1 files changed, 186 insertions, 212 deletions
diff --git a/test/test_tls.py b/test/test_tls.py
index a0434174..9881e973 100644
--- a/test/test_tls.py
+++ b/test/test_tls.py
@@ -1,17 +1,18 @@
import io
+import pytest
import re
import ssl
import subprocess
-import unittest
from unit.applications.tls import TestApplicationTLS
+from conftest import skip_alert
class TestTLS(TestApplicationTLS):
prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}}
def findall(self, pattern):
- with open(self.testdir + '/unit.log', 'r', errors='ignore') as f:
+ with open(self.temp_dir + '/unit.log', 'r', errors='ignore') as f:
return re.findall(pattern, f.read())
def openssl_date_to_sec_epoch(self, date):
@@ -38,7 +39,7 @@ class TestTLS(TestApplicationTLS):
self.add_tls()
- self.assertEqual(self.get_ssl()['status'], 200, 'add listener option')
+ assert self.get_ssl()['status'] == 200, 'add listener option'
def test_tls_listener_option_remove(self):
self.load('empty')
@@ -51,18 +52,16 @@ class TestTLS(TestApplicationTLS):
self.remove_tls()
- self.assertEqual(self.get()['status'], 200, 'remove listener option')
+ assert self.get()['status'] == 200, 'remove listener option'
def test_tls_certificate_remove(self):
self.load('empty')
self.certificate()
- self.assertIn(
- 'success',
- self.conf_delete('/certificates/default'),
- 'remove certificate',
- )
+ assert 'success' in self.conf_delete(
+ '/certificates/default'
+ ), 'remove certificate'
def test_tls_certificate_remove_used(self):
self.load('empty')
@@ -71,11 +70,9 @@ class TestTLS(TestApplicationTLS):
self.add_tls()
- self.assertIn(
- 'error',
- self.conf_delete('/certificates/default'),
- 'remove certificate',
- )
+ assert 'error' in self.conf_delete(
+ '/certificates/default'
+ ), 'remove certificate'
def test_tls_certificate_remove_nonexisting(self):
self.load('empty')
@@ -84,13 +81,11 @@ class TestTLS(TestApplicationTLS):
self.add_tls()
- self.assertIn(
- 'error',
- self.conf_delete('/certificates/blah'),
- 'remove nonexistings certificate',
- )
+ assert 'error' in self.conf_delete(
+ '/certificates/blah'
+ ), 'remove nonexistings certificate'
- @unittest.skip('not yet')
+ @pytest.mark.skip('not yet')
def test_tls_certificate_update(self):
self.load('empty')
@@ -102,20 +97,18 @@ class TestTLS(TestApplicationTLS):
self.certificate()
- self.assertNotEqual(
- cert_old, self.get_server_certificate(), 'update certificate'
- )
+ assert cert_old != self.get_server_certificate(), 'update certificate'
- @unittest.skip('not yet')
+ @pytest.mark.skip('not yet')
def test_tls_certificate_key_incorrect(self):
self.load('empty')
self.certificate('first', False)
self.certificate('second', False)
- self.assertIn(
- 'error', self.certificate_load('first', 'second'), 'key incorrect'
- )
+ assert 'error' in self.certificate_load(
+ 'first', 'second'
+ ), 'key incorrect'
def test_tls_certificate_change(self):
self.load('empty')
@@ -129,20 +122,16 @@ class TestTLS(TestApplicationTLS):
self.add_tls(cert='new')
- self.assertNotEqual(
- cert_old, self.get_server_certificate(), 'change certificate'
- )
+ assert cert_old != self.get_server_certificate(), 'change certificate'
def test_tls_certificate_key_rsa(self):
self.load('empty')
self.certificate()
- self.assertEqual(
- self.conf_get('/certificates/default/key'),
- 'RSA (2048 bits)',
- 'certificate key rsa',
- )
+ assert (
+ self.conf_get('/certificates/default/key') == 'RSA (2048 bits)'
+ ), 'certificate key rsa'
def test_tls_certificate_key_ec(self):
self.load('empty')
@@ -155,8 +144,10 @@ class TestTLS(TestApplicationTLS):
'ecparam',
'-noout',
'-genkey',
- '-out', self.testdir + '/ec.key',
- '-name', 'prime256v1',
+ '-out',
+ self.temp_dir + '/ec.key',
+ '-name',
+ 'prime256v1',
],
stderr=subprocess.STDOUT,
)
@@ -167,19 +158,23 @@ class TestTLS(TestApplicationTLS):
'req',
'-x509',
'-new',
- '-subj', '/CN=ec/',
- '-config', self.testdir + '/openssl.conf',
- '-key', self.testdir + '/ec.key',
- '-out', self.testdir + '/ec.crt',
+ '-subj',
+ '/CN=ec/',
+ '-config',
+ self.temp_dir + '/openssl.conf',
+ '-key',
+ self.temp_dir + '/ec.key',
+ '-out',
+ self.temp_dir + '/ec.crt',
],
stderr=subprocess.STDOUT,
)
self.certificate_load('ec')
- self.assertEqual(
- self.conf_get('/certificates/ec/key'), 'ECDH', 'certificate key ec'
- )
+ assert (
+ self.conf_get('/certificates/ec/key') == 'ECDH'
+ ), 'certificate key ec'
def test_tls_certificate_chain_options(self):
self.load('empty')
@@ -188,35 +183,29 @@ class TestTLS(TestApplicationTLS):
chain = self.conf_get('/certificates/default/chain')
- self.assertEqual(len(chain), 1, 'certificate chain length')
+ assert len(chain) == 1, 'certificate chain length'
cert = chain[0]
- self.assertEqual(
- cert['subject']['common_name'],
- 'default',
- 'certificate subject common name',
- )
- self.assertEqual(
- cert['issuer']['common_name'],
- 'default',
- 'certificate issuer common name',
- )
+ assert (
+ cert['subject']['common_name'] == 'default'
+ ), 'certificate subject common name'
+ assert (
+ cert['issuer']['common_name'] == 'default'
+ ), 'certificate issuer common name'
- self.assertLess(
+ assert (
abs(
self.sec_epoch()
- self.openssl_date_to_sec_epoch(cert['validity']['since'])
- ),
- 5,
- 'certificate validity since',
- )
- self.assertEqual(
+ )
+ < 5
+ ), 'certificate validity since'
+ assert (
self.openssl_date_to_sec_epoch(cert['validity']['until'])
- - self.openssl_date_to_sec_epoch(cert['validity']['since']),
- 2592000,
- 'certificate validity until',
- )
+ - self.openssl_date_to_sec_epoch(cert['validity']['since'])
+ == 2592000
+ ), 'certificate validity until'
def test_tls_certificate_chain(self):
self.load('empty')
@@ -228,10 +217,14 @@ class TestTLS(TestApplicationTLS):
'openssl',
'req',
'-new',
- '-subj', '/CN=int/',
- '-config', self.testdir + '/openssl.conf',
- '-out', self.testdir + '/int.csr',
- '-keyout', self.testdir + '/int.key',
+ '-subj',
+ '/CN=int/',
+ '-config',
+ self.temp_dir + '/openssl.conf',
+ '-out',
+ self.temp_dir + '/int.csr',
+ '-keyout',
+ self.temp_dir + '/int.key',
],
stderr=subprocess.STDOUT,
)
@@ -241,15 +234,19 @@ class TestTLS(TestApplicationTLS):
'openssl',
'req',
'-new',
- '-subj', '/CN=end/',
- '-config', self.testdir + '/openssl.conf',
- '-out', self.testdir + '/end.csr',
- '-keyout', self.testdir + '/end.key',
+ '-subj',
+ '/CN=end/',
+ '-config',
+ self.temp_dir + '/openssl.conf',
+ '-out',
+ self.temp_dir + '/end.csr',
+ '-keyout',
+ self.temp_dir + '/end.key',
],
stderr=subprocess.STDOUT,
)
- with open(self.testdir + '/ca.conf', 'w') as f:
+ with open(self.temp_dir + '/ca.conf', 'w') as f:
f.write(
"""[ ca ]
default_ca = myca
@@ -269,16 +266,16 @@ commonName = supplied
[ myca_extensions ]
basicConstraints = critical,CA:TRUE"""
% {
- 'dir': self.testdir,
- 'database': self.testdir + '/certindex',
- 'certserial': self.testdir + '/certserial',
+ 'dir': self.temp_dir,
+ 'database': self.temp_dir + '/certindex',
+ 'certserial': self.temp_dir + '/certserial',
}
)
- with open(self.testdir + '/certserial', 'w') as f:
+ with open(self.temp_dir + '/certserial', 'w') as f:
f.write('1000')
- with open(self.testdir + '/certindex', 'w') as f:
+ with open(self.temp_dir + '/certindex', 'w') as f:
f.write('')
subprocess.call(
@@ -286,12 +283,18 @@ basicConstraints = critical,CA:TRUE"""
'openssl',
'ca',
'-batch',
- '-subj', '/CN=int/',
- '-config', self.testdir + '/ca.conf',
- '-keyfile', self.testdir + '/root.key',
- '-cert', self.testdir + '/root.crt',
- '-in', self.testdir + '/int.csr',
- '-out', self.testdir + '/int.crt',
+ '-subj',
+ '/CN=int/',
+ '-config',
+ self.temp_dir + '/ca.conf',
+ '-keyfile',
+ self.temp_dir + '/root.key',
+ '-cert',
+ self.temp_dir + '/root.crt',
+ '-in',
+ self.temp_dir + '/int.csr',
+ '-out',
+ self.temp_dir + '/int.crt',
],
stderr=subprocess.STDOUT,
)
@@ -301,50 +304,50 @@ basicConstraints = critical,CA:TRUE"""
'openssl',
'ca',
'-batch',
- '-subj', '/CN=end/',
- '-config', self.testdir + '/ca.conf',
- '-keyfile', self.testdir + '/int.key',
- '-cert', self.testdir + '/int.crt',
- '-in', self.testdir + '/end.csr',
- '-out', self.testdir + '/end.crt',
+ '-subj',
+ '/CN=end/',
+ '-config',
+ self.temp_dir + '/ca.conf',
+ '-keyfile',
+ self.temp_dir + '/int.key',
+ '-cert',
+ self.temp_dir + '/int.crt',
+ '-in',
+ self.temp_dir + '/end.csr',
+ '-out',
+ self.temp_dir + '/end.crt',
],
stderr=subprocess.STDOUT,
)
- crt_path = self.testdir + '/end-int.crt'
- end_path = self.testdir + '/end.crt'
- int_path = self.testdir + '/int.crt'
+ crt_path = self.temp_dir + '/end-int.crt'
+ end_path = self.temp_dir + '/end.crt'
+ int_path = self.temp_dir + '/int.crt'
- with open(crt_path, 'wb') as crt, \
- open(end_path, 'rb') as end, \
- open(int_path, 'rb') as int:
+ with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open(
+ int_path, 'rb'
+ ) as int:
crt.write(end.read() + int.read())
self.context = ssl.create_default_context()
self.context.check_hostname = False
self.context.verify_mode = ssl.CERT_REQUIRED
- self.context.load_verify_locations(self.testdir + '/root.crt')
+ self.context.load_verify_locations(self.temp_dir + '/root.crt')
# incomplete chain
- self.assertIn(
- 'success',
- self.certificate_load('end', 'end'),
- 'certificate chain end upload',
- )
+ assert 'success' in self.certificate_load(
+ 'end', 'end'
+ ), 'certificate chain end upload'
chain = self.conf_get('/certificates/end/chain')
- self.assertEqual(len(chain), 1, 'certificate chain end length')
- self.assertEqual(
- chain[0]['subject']['common_name'],
- 'end',
- 'certificate chain end subject common name',
- )
- self.assertEqual(
- chain[0]['issuer']['common_name'],
- 'int',
- 'certificate chain end issuer common name',
- )
+ assert len(chain) == 1, 'certificate chain end length'
+ assert (
+ chain[0]['subject']['common_name'] == 'end'
+ ), 'certificate chain end subject common name'
+ assert (
+ chain[0]['issuer']['common_name'] == 'int'
+ ), 'certificate chain end issuer common name'
self.add_tls(cert='end')
@@ -353,79 +356,61 @@ basicConstraints = critical,CA:TRUE"""
except ssl.SSLError:
resp = None
- self.assertEqual(resp, None, 'certificate chain incomplete chain')
+ assert resp == None, 'certificate chain incomplete chain'
# intermediate
- self.assertIn(
- 'success',
- self.certificate_load('int', 'int'),
- 'certificate chain int upload',
- )
+ assert 'success' in self.certificate_load(
+ 'int', 'int'
+ ), 'certificate chain int upload'
chain = self.conf_get('/certificates/int/chain')
- self.assertEqual(len(chain), 1, 'certificate chain int length')
- self.assertEqual(
- chain[0]['subject']['common_name'],
- 'int',
- 'certificate chain int subject common name',
- )
- self.assertEqual(
- chain[0]['issuer']['common_name'],
- 'root',
- 'certificate chain int issuer common name',
- )
+ assert len(chain) == 1, 'certificate chain int length'
+ assert (
+ chain[0]['subject']['common_name'] == 'int'
+ ), 'certificate chain int subject common name'
+ assert (
+ chain[0]['issuer']['common_name'] == 'root'
+ ), 'certificate chain int issuer common name'
self.add_tls(cert='int')
- self.assertEqual(
- self.get_ssl()['status'], 200, 'certificate chain intermediate'
- )
+ assert (
+ self.get_ssl()['status'] == 200
+ ), 'certificate chain intermediate'
# intermediate server
- self.assertIn(
- 'success',
- self.certificate_load('end-int', 'end'),
- 'certificate chain end-int upload',
- )
+ assert 'success' in self.certificate_load(
+ 'end-int', 'end'
+ ), 'certificate chain end-int upload'
chain = self.conf_get('/certificates/end-int/chain')
- self.assertEqual(len(chain), 2, 'certificate chain end-int length')
- self.assertEqual(
- chain[0]['subject']['common_name'],
- 'end',
- 'certificate chain end-int int subject common name',
- )
- self.assertEqual(
- chain[0]['issuer']['common_name'],
- 'int',
- 'certificate chain end-int int issuer common name',
- )
- self.assertEqual(
- chain[1]['subject']['common_name'],
- 'int',
- 'certificate chain end-int end subject common name',
- )
- self.assertEqual(
- chain[1]['issuer']['common_name'],
- 'root',
- 'certificate chain end-int end issuer common name',
- )
+ assert len(chain) == 2, 'certificate chain end-int length'
+ assert (
+ chain[0]['subject']['common_name'] == 'end'
+ ), 'certificate chain end-int int subject common name'
+ assert (
+ chain[0]['issuer']['common_name'] == 'int'
+ ), 'certificate chain end-int int issuer common name'
+ assert (
+ chain[1]['subject']['common_name'] == 'int'
+ ), 'certificate chain end-int end subject common name'
+ assert (
+ chain[1]['issuer']['common_name'] == 'root'
+ ), 'certificate chain end-int end issuer common name'
self.add_tls(cert='end-int')
- self.assertEqual(
- self.get_ssl()['status'],
- 200,
- 'certificate chain intermediate server',
- )
+ assert (
+ self.get_ssl()['status'] == 200
+ ), 'certificate chain intermediate server'
- @unittest.skip('not yet')
+ @pytest.mark.skip('not yet')
def test_tls_reconfigure(self):
self.load('empty')
- self.assertEqual(self.get()['status'], 200, 'init')
+ assert self.get()['status'] == 200, 'init'
self.certificate()
@@ -435,21 +420,17 @@ basicConstraints = critical,CA:TRUE"""
read_timeout=1,
)
- self.assertEqual(resp['status'], 200, 'initial status')
+ assert resp['status'] == 200, 'initial status'
self.add_tls()
- self.assertEqual(
- self.get(sock=sock)['status'], 200, 'reconfigure status'
- )
- self.assertEqual(
- self.get_ssl()['status'], 200, 'reconfigure tls status'
- )
+ assert self.get(sock=sock)['status'] == 200, 'reconfigure status'
+ assert self.get_ssl()['status'] == 200, 'reconfigure tls status'
def test_tls_keepalive(self):
self.load('mirror')
- self.assertEqual(self.get()['status'], 200, 'init')
+ assert self.get()['status'] == 200, 'init'
self.certificate()
@@ -466,7 +447,7 @@ basicConstraints = critical,CA:TRUE"""
read_timeout=1,
)
- self.assertEqual(resp['body'], '0123456789', 'keepalive 1')
+ assert resp['body'] == '0123456789', 'keepalive 1'
resp = self.post_ssl(
headers={
@@ -478,13 +459,13 @@ basicConstraints = critical,CA:TRUE"""
body='0123456789',
)
- self.assertEqual(resp['body'], '0123456789', 'keepalive 2')
+ assert resp['body'] == '0123456789', 'keepalive 2'
- @unittest.skip('not yet')
+ @pytest.mark.skip('not yet')
def test_tls_keepalive_certificate_remove(self):
self.load('empty')
- self.assertEqual(self.get()['status'], 200, 'init')
+ assert self.get()['status'] == 200, 'init'
self.certificate()
@@ -506,19 +487,17 @@ basicConstraints = critical,CA:TRUE"""
except:
resp = None
- self.assertEqual(resp, None, 'keepalive remove certificate')
+ assert resp == None, 'keepalive remove certificate'
- @unittest.skip('not yet')
+ @pytest.mark.skip('not yet')
def test_tls_certificates_remove_all(self):
self.load('empty')
self.certificate()
- self.assertIn(
- 'success',
- self.conf_delete('/certificates'),
- 'remove all certificates',
- )
+ assert 'success' in self.conf_delete(
+ '/certificates'
+ ), 'remove all certificates'
def test_tls_application_respawn(self):
self.load('mirror')
@@ -544,7 +523,7 @@ basicConstraints = critical,CA:TRUE"""
subprocess.call(['kill', '-9', app_id])
- self.skip_alerts.append(r'process %s exited on signal 9' % app_id)
+ skip_alert(r'process %s exited on signal 9' % app_id)
self.wait_for_record(
re.compile(
@@ -562,15 +541,13 @@ basicConstraints = critical,CA:TRUE"""
body='0123456789',
)
- self.assertEqual(resp['status'], 200, 'application respawn status')
- self.assertEqual(
- resp['body'], '0123456789', 'application respawn body'
- )
+ assert resp['status'] == 200, 'application respawn status'
+ assert resp['body'] == '0123456789', 'application respawn body'
def test_tls_url_scheme(self):
self.load('variables')
- self.assertEqual(
+ assert (
self.post(
headers={
'Host': 'localhost',
@@ -578,16 +555,15 @@ basicConstraints = critical,CA:TRUE"""
'Custom-Header': '',
'Connection': 'close',
}
- )['headers']['Wsgi-Url-Scheme'],
- 'http',
- 'url scheme http',
- )
+ )['headers']['Wsgi-Url-Scheme']
+ == 'http'
+ ), 'url scheme http'
self.certificate()
self.add_tls(application='variables')
- self.assertEqual(
+ assert (
self.post_ssl(
headers={
'Host': 'localhost',
@@ -595,10 +571,9 @@ basicConstraints = critical,CA:TRUE"""
'Custom-Header': '',
'Connection': 'close',
}
- )['headers']['Wsgi-Url-Scheme'],
- 'https',
- 'url scheme https',
- )
+ )['headers']['Wsgi-Url-Scheme']
+ == 'https'
+ ), 'url scheme https'
def test_tls_big_upload(self):
self.load('upload')
@@ -610,15 +585,14 @@ basicConstraints = critical,CA:TRUE"""
filename = 'test.txt'
data = '0123456789' * 9000
- res = self.post_ssl(body={
- 'file': {
- 'filename': filename,
- 'type': 'text/plain',
- 'data': io.StringIO(data),
+ res = self.post_ssl(
+ body={
+ 'file': {
+ 'filename': filename,
+ 'type': 'text/plain',
+ 'data': io.StringIO(data),
+ }
}
- })
- self.assertEqual(res['status'], 200, 'status ok')
- self.assertEqual(res['body'], filename + data)
-
-if __name__ == '__main__':
- TestTLS.main()
+ )
+ assert res['status'] == 200, 'status ok'
+ assert res['body'] == filename + data