diff options
author | Andrei Zeliankou <zelenkov@nginx.com> | 2020-09-16 21:31:15 +0100 |
---|---|---|
committer | Andrei Zeliankou <zelenkov@nginx.com> | 2020-09-16 21:31:15 +0100 |
commit | d5e915934066c77a59d211efafca10c117b73d05 (patch) | |
tree | f894a3c09bd8aa43e87276eed377eb09f97e46fe /test/test_tls.py | |
parent | 77ecb6ab49257dd662aa9c461fed3dc1d74e5092 (diff) | |
download | unit-d5e915934066c77a59d211efafca10c117b73d05.tar.gz unit-d5e915934066c77a59d211efafca10c117b73d05.tar.bz2 |
Tests: migrated to the pytest.
Diffstat (limited to 'test/test_tls.py')
-rw-r--r-- | test/test_tls.py | 398 |
1 files changed, 186 insertions, 212 deletions
diff --git a/test/test_tls.py b/test/test_tls.py index a0434174..9881e973 100644 --- a/test/test_tls.py +++ b/test/test_tls.py @@ -1,17 +1,18 @@ import io +import pytest import re import ssl import subprocess -import unittest from unit.applications.tls import TestApplicationTLS +from conftest import skip_alert class TestTLS(TestApplicationTLS): prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}} def findall(self, pattern): - with open(self.testdir + '/unit.log', 'r', errors='ignore') as f: + with open(self.temp_dir + '/unit.log', 'r', errors='ignore') as f: return re.findall(pattern, f.read()) def openssl_date_to_sec_epoch(self, date): @@ -38,7 +39,7 @@ class TestTLS(TestApplicationTLS): self.add_tls() - self.assertEqual(self.get_ssl()['status'], 200, 'add listener option') + assert self.get_ssl()['status'] == 200, 'add listener option' def test_tls_listener_option_remove(self): self.load('empty') @@ -51,18 +52,16 @@ class TestTLS(TestApplicationTLS): self.remove_tls() - self.assertEqual(self.get()['status'], 200, 'remove listener option') + assert self.get()['status'] == 200, 'remove listener option' def test_tls_certificate_remove(self): self.load('empty') self.certificate() - self.assertIn( - 'success', - self.conf_delete('/certificates/default'), - 'remove certificate', - ) + assert 'success' in self.conf_delete( + '/certificates/default' + ), 'remove certificate' def test_tls_certificate_remove_used(self): self.load('empty') @@ -71,11 +70,9 @@ class TestTLS(TestApplicationTLS): self.add_tls() - self.assertIn( - 'error', - self.conf_delete('/certificates/default'), - 'remove certificate', - ) + assert 'error' in self.conf_delete( + '/certificates/default' + ), 'remove certificate' def test_tls_certificate_remove_nonexisting(self): self.load('empty') @@ -84,13 +81,11 @@ class TestTLS(TestApplicationTLS): self.add_tls() - self.assertIn( - 'error', - self.conf_delete('/certificates/blah'), - 'remove nonexistings certificate', - ) + assert 'error' in self.conf_delete( + '/certificates/blah' + ), 'remove nonexistings certificate' - @unittest.skip('not yet') + @pytest.mark.skip('not yet') def test_tls_certificate_update(self): self.load('empty') @@ -102,20 +97,18 @@ class TestTLS(TestApplicationTLS): self.certificate() - self.assertNotEqual( - cert_old, self.get_server_certificate(), 'update certificate' - ) + assert cert_old != self.get_server_certificate(), 'update certificate' - @unittest.skip('not yet') + @pytest.mark.skip('not yet') def test_tls_certificate_key_incorrect(self): self.load('empty') self.certificate('first', False) self.certificate('second', False) - self.assertIn( - 'error', self.certificate_load('first', 'second'), 'key incorrect' - ) + assert 'error' in self.certificate_load( + 'first', 'second' + ), 'key incorrect' def test_tls_certificate_change(self): self.load('empty') @@ -129,20 +122,16 @@ class TestTLS(TestApplicationTLS): self.add_tls(cert='new') - self.assertNotEqual( - cert_old, self.get_server_certificate(), 'change certificate' - ) + assert cert_old != self.get_server_certificate(), 'change certificate' def test_tls_certificate_key_rsa(self): self.load('empty') self.certificate() - self.assertEqual( - self.conf_get('/certificates/default/key'), - 'RSA (2048 bits)', - 'certificate key rsa', - ) + assert ( + self.conf_get('/certificates/default/key') == 'RSA (2048 bits)' + ), 'certificate key rsa' def test_tls_certificate_key_ec(self): self.load('empty') @@ -155,8 +144,10 @@ class TestTLS(TestApplicationTLS): 'ecparam', '-noout', '-genkey', - '-out', self.testdir + '/ec.key', - '-name', 'prime256v1', + '-out', + self.temp_dir + '/ec.key', + '-name', + 'prime256v1', ], stderr=subprocess.STDOUT, ) @@ -167,19 +158,23 @@ class TestTLS(TestApplicationTLS): 'req', '-x509', '-new', - '-subj', '/CN=ec/', - '-config', self.testdir + '/openssl.conf', - '-key', self.testdir + '/ec.key', - '-out', self.testdir + '/ec.crt', + '-subj', + '/CN=ec/', + '-config', + self.temp_dir + '/openssl.conf', + '-key', + self.temp_dir + '/ec.key', + '-out', + self.temp_dir + '/ec.crt', ], stderr=subprocess.STDOUT, ) self.certificate_load('ec') - self.assertEqual( - self.conf_get('/certificates/ec/key'), 'ECDH', 'certificate key ec' - ) + assert ( + self.conf_get('/certificates/ec/key') == 'ECDH' + ), 'certificate key ec' def test_tls_certificate_chain_options(self): self.load('empty') @@ -188,35 +183,29 @@ class TestTLS(TestApplicationTLS): chain = self.conf_get('/certificates/default/chain') - self.assertEqual(len(chain), 1, 'certificate chain length') + assert len(chain) == 1, 'certificate chain length' cert = chain[0] - self.assertEqual( - cert['subject']['common_name'], - 'default', - 'certificate subject common name', - ) - self.assertEqual( - cert['issuer']['common_name'], - 'default', - 'certificate issuer common name', - ) + assert ( + cert['subject']['common_name'] == 'default' + ), 'certificate subject common name' + assert ( + cert['issuer']['common_name'] == 'default' + ), 'certificate issuer common name' - self.assertLess( + assert ( abs( self.sec_epoch() - self.openssl_date_to_sec_epoch(cert['validity']['since']) - ), - 5, - 'certificate validity since', - ) - self.assertEqual( + ) + < 5 + ), 'certificate validity since' + assert ( self.openssl_date_to_sec_epoch(cert['validity']['until']) - - self.openssl_date_to_sec_epoch(cert['validity']['since']), - 2592000, - 'certificate validity until', - ) + - self.openssl_date_to_sec_epoch(cert['validity']['since']) + == 2592000 + ), 'certificate validity until' def test_tls_certificate_chain(self): self.load('empty') @@ -228,10 +217,14 @@ class TestTLS(TestApplicationTLS): 'openssl', 'req', '-new', - '-subj', '/CN=int/', - '-config', self.testdir + '/openssl.conf', - '-out', self.testdir + '/int.csr', - '-keyout', self.testdir + '/int.key', + '-subj', + '/CN=int/', + '-config', + self.temp_dir + '/openssl.conf', + '-out', + self.temp_dir + '/int.csr', + '-keyout', + self.temp_dir + '/int.key', ], stderr=subprocess.STDOUT, ) @@ -241,15 +234,19 @@ class TestTLS(TestApplicationTLS): 'openssl', 'req', '-new', - '-subj', '/CN=end/', - '-config', self.testdir + '/openssl.conf', - '-out', self.testdir + '/end.csr', - '-keyout', self.testdir + '/end.key', + '-subj', + '/CN=end/', + '-config', + self.temp_dir + '/openssl.conf', + '-out', + self.temp_dir + '/end.csr', + '-keyout', + self.temp_dir + '/end.key', ], stderr=subprocess.STDOUT, ) - with open(self.testdir + '/ca.conf', 'w') as f: + with open(self.temp_dir + '/ca.conf', 'w') as f: f.write( """[ ca ] default_ca = myca @@ -269,16 +266,16 @@ commonName = supplied [ myca_extensions ] basicConstraints = critical,CA:TRUE""" % { - 'dir': self.testdir, - 'database': self.testdir + '/certindex', - 'certserial': self.testdir + '/certserial', + 'dir': self.temp_dir, + 'database': self.temp_dir + '/certindex', + 'certserial': self.temp_dir + '/certserial', } ) - with open(self.testdir + '/certserial', 'w') as f: + with open(self.temp_dir + '/certserial', 'w') as f: f.write('1000') - with open(self.testdir + '/certindex', 'w') as f: + with open(self.temp_dir + '/certindex', 'w') as f: f.write('') subprocess.call( @@ -286,12 +283,18 @@ basicConstraints = critical,CA:TRUE""" 'openssl', 'ca', '-batch', - '-subj', '/CN=int/', - '-config', self.testdir + '/ca.conf', - '-keyfile', self.testdir + '/root.key', - '-cert', self.testdir + '/root.crt', - '-in', self.testdir + '/int.csr', - '-out', self.testdir + '/int.crt', + '-subj', + '/CN=int/', + '-config', + self.temp_dir + '/ca.conf', + '-keyfile', + self.temp_dir + '/root.key', + '-cert', + self.temp_dir + '/root.crt', + '-in', + self.temp_dir + '/int.csr', + '-out', + self.temp_dir + '/int.crt', ], stderr=subprocess.STDOUT, ) @@ -301,50 +304,50 @@ basicConstraints = critical,CA:TRUE""" 'openssl', 'ca', '-batch', - '-subj', '/CN=end/', - '-config', self.testdir + '/ca.conf', - '-keyfile', self.testdir + '/int.key', - '-cert', self.testdir + '/int.crt', - '-in', self.testdir + '/end.csr', - '-out', self.testdir + '/end.crt', + '-subj', + '/CN=end/', + '-config', + self.temp_dir + '/ca.conf', + '-keyfile', + self.temp_dir + '/int.key', + '-cert', + self.temp_dir + '/int.crt', + '-in', + self.temp_dir + '/end.csr', + '-out', + self.temp_dir + '/end.crt', ], stderr=subprocess.STDOUT, ) - crt_path = self.testdir + '/end-int.crt' - end_path = self.testdir + '/end.crt' - int_path = self.testdir + '/int.crt' + crt_path = self.temp_dir + '/end-int.crt' + end_path = self.temp_dir + '/end.crt' + int_path = self.temp_dir + '/int.crt' - with open(crt_path, 'wb') as crt, \ - open(end_path, 'rb') as end, \ - open(int_path, 'rb') as int: + with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open( + int_path, 'rb' + ) as int: crt.write(end.read() + int.read()) self.context = ssl.create_default_context() self.context.check_hostname = False self.context.verify_mode = ssl.CERT_REQUIRED - self.context.load_verify_locations(self.testdir + '/root.crt') + self.context.load_verify_locations(self.temp_dir + '/root.crt') # incomplete chain - self.assertIn( - 'success', - self.certificate_load('end', 'end'), - 'certificate chain end upload', - ) + assert 'success' in self.certificate_load( + 'end', 'end' + ), 'certificate chain end upload' chain = self.conf_get('/certificates/end/chain') - self.assertEqual(len(chain), 1, 'certificate chain end length') - self.assertEqual( - chain[0]['subject']['common_name'], - 'end', - 'certificate chain end subject common name', - ) - self.assertEqual( - chain[0]['issuer']['common_name'], - 'int', - 'certificate chain end issuer common name', - ) + assert len(chain) == 1, 'certificate chain end length' + assert ( + chain[0]['subject']['common_name'] == 'end' + ), 'certificate chain end subject common name' + assert ( + chain[0]['issuer']['common_name'] == 'int' + ), 'certificate chain end issuer common name' self.add_tls(cert='end') @@ -353,79 +356,61 @@ basicConstraints = critical,CA:TRUE""" except ssl.SSLError: resp = None - self.assertEqual(resp, None, 'certificate chain incomplete chain') + assert resp == None, 'certificate chain incomplete chain' # intermediate - self.assertIn( - 'success', - self.certificate_load('int', 'int'), - 'certificate chain int upload', - ) + assert 'success' in self.certificate_load( + 'int', 'int' + ), 'certificate chain int upload' chain = self.conf_get('/certificates/int/chain') - self.assertEqual(len(chain), 1, 'certificate chain int length') - self.assertEqual( - chain[0]['subject']['common_name'], - 'int', - 'certificate chain int subject common name', - ) - self.assertEqual( - chain[0]['issuer']['common_name'], - 'root', - 'certificate chain int issuer common name', - ) + assert len(chain) == 1, 'certificate chain int length' + assert ( + chain[0]['subject']['common_name'] == 'int' + ), 'certificate chain int subject common name' + assert ( + chain[0]['issuer']['common_name'] == 'root' + ), 'certificate chain int issuer common name' self.add_tls(cert='int') - self.assertEqual( - self.get_ssl()['status'], 200, 'certificate chain intermediate' - ) + assert ( + self.get_ssl()['status'] == 200 + ), 'certificate chain intermediate' # intermediate server - self.assertIn( - 'success', - self.certificate_load('end-int', 'end'), - 'certificate chain end-int upload', - ) + assert 'success' in self.certificate_load( + 'end-int', 'end' + ), 'certificate chain end-int upload' chain = self.conf_get('/certificates/end-int/chain') - self.assertEqual(len(chain), 2, 'certificate chain end-int length') - self.assertEqual( - chain[0]['subject']['common_name'], - 'end', - 'certificate chain end-int int subject common name', - ) - self.assertEqual( - chain[0]['issuer']['common_name'], - 'int', - 'certificate chain end-int int issuer common name', - ) - self.assertEqual( - chain[1]['subject']['common_name'], - 'int', - 'certificate chain end-int end subject common name', - ) - self.assertEqual( - chain[1]['issuer']['common_name'], - 'root', - 'certificate chain end-int end issuer common name', - ) + assert len(chain) == 2, 'certificate chain end-int length' + assert ( + chain[0]['subject']['common_name'] == 'end' + ), 'certificate chain end-int int subject common name' + assert ( + chain[0]['issuer']['common_name'] == 'int' + ), 'certificate chain end-int int issuer common name' + assert ( + chain[1]['subject']['common_name'] == 'int' + ), 'certificate chain end-int end subject common name' + assert ( + chain[1]['issuer']['common_name'] == 'root' + ), 'certificate chain end-int end issuer common name' self.add_tls(cert='end-int') - self.assertEqual( - self.get_ssl()['status'], - 200, - 'certificate chain intermediate server', - ) + assert ( + self.get_ssl()['status'] == 200 + ), 'certificate chain intermediate server' - @unittest.skip('not yet') + @pytest.mark.skip('not yet') def test_tls_reconfigure(self): self.load('empty') - self.assertEqual(self.get()['status'], 200, 'init') + assert self.get()['status'] == 200, 'init' self.certificate() @@ -435,21 +420,17 @@ basicConstraints = critical,CA:TRUE""" read_timeout=1, ) - self.assertEqual(resp['status'], 200, 'initial status') + assert resp['status'] == 200, 'initial status' self.add_tls() - self.assertEqual( - self.get(sock=sock)['status'], 200, 'reconfigure status' - ) - self.assertEqual( - self.get_ssl()['status'], 200, 'reconfigure tls status' - ) + assert self.get(sock=sock)['status'] == 200, 'reconfigure status' + assert self.get_ssl()['status'] == 200, 'reconfigure tls status' def test_tls_keepalive(self): self.load('mirror') - self.assertEqual(self.get()['status'], 200, 'init') + assert self.get()['status'] == 200, 'init' self.certificate() @@ -466,7 +447,7 @@ basicConstraints = critical,CA:TRUE""" read_timeout=1, ) - self.assertEqual(resp['body'], '0123456789', 'keepalive 1') + assert resp['body'] == '0123456789', 'keepalive 1' resp = self.post_ssl( headers={ @@ -478,13 +459,13 @@ basicConstraints = critical,CA:TRUE""" body='0123456789', ) - self.assertEqual(resp['body'], '0123456789', 'keepalive 2') + assert resp['body'] == '0123456789', 'keepalive 2' - @unittest.skip('not yet') + @pytest.mark.skip('not yet') def test_tls_keepalive_certificate_remove(self): self.load('empty') - self.assertEqual(self.get()['status'], 200, 'init') + assert self.get()['status'] == 200, 'init' self.certificate() @@ -506,19 +487,17 @@ basicConstraints = critical,CA:TRUE""" except: resp = None - self.assertEqual(resp, None, 'keepalive remove certificate') + assert resp == None, 'keepalive remove certificate' - @unittest.skip('not yet') + @pytest.mark.skip('not yet') def test_tls_certificates_remove_all(self): self.load('empty') self.certificate() - self.assertIn( - 'success', - self.conf_delete('/certificates'), - 'remove all certificates', - ) + assert 'success' in self.conf_delete( + '/certificates' + ), 'remove all certificates' def test_tls_application_respawn(self): self.load('mirror') @@ -544,7 +523,7 @@ basicConstraints = critical,CA:TRUE""" subprocess.call(['kill', '-9', app_id]) - self.skip_alerts.append(r'process %s exited on signal 9' % app_id) + skip_alert(r'process %s exited on signal 9' % app_id) self.wait_for_record( re.compile( @@ -562,15 +541,13 @@ basicConstraints = critical,CA:TRUE""" body='0123456789', ) - self.assertEqual(resp['status'], 200, 'application respawn status') - self.assertEqual( - resp['body'], '0123456789', 'application respawn body' - ) + assert resp['status'] == 200, 'application respawn status' + assert resp['body'] == '0123456789', 'application respawn body' def test_tls_url_scheme(self): self.load('variables') - self.assertEqual( + assert ( self.post( headers={ 'Host': 'localhost', @@ -578,16 +555,15 @@ basicConstraints = critical,CA:TRUE""" 'Custom-Header': '', 'Connection': 'close', } - )['headers']['Wsgi-Url-Scheme'], - 'http', - 'url scheme http', - ) + )['headers']['Wsgi-Url-Scheme'] + == 'http' + ), 'url scheme http' self.certificate() self.add_tls(application='variables') - self.assertEqual( + assert ( self.post_ssl( headers={ 'Host': 'localhost', @@ -595,10 +571,9 @@ basicConstraints = critical,CA:TRUE""" 'Custom-Header': '', 'Connection': 'close', } - )['headers']['Wsgi-Url-Scheme'], - 'https', - 'url scheme https', - ) + )['headers']['Wsgi-Url-Scheme'] + == 'https' + ), 'url scheme https' def test_tls_big_upload(self): self.load('upload') @@ -610,15 +585,14 @@ basicConstraints = critical,CA:TRUE""" filename = 'test.txt' data = '0123456789' * 9000 - res = self.post_ssl(body={ - 'file': { - 'filename': filename, - 'type': 'text/plain', - 'data': io.StringIO(data), + res = self.post_ssl( + body={ + 'file': { + 'filename': filename, + 'type': 'text/plain', + 'data': io.StringIO(data), + } } - }) - self.assertEqual(res['status'], 200, 'status ok') - self.assertEqual(res['body'], filename + data) - -if __name__ == '__main__': - TestTLS.main() + ) + assert res['status'] == 200, 'status ok' + assert res['body'] == filename + data |