diff options
author | Andrey Zelenkov <zelenkov@nginx.com> | 2019-03-26 23:38:30 +0300 |
---|---|---|
committer | Andrey Zelenkov <zelenkov@nginx.com> | 2019-03-26 23:38:30 +0300 |
commit | 281899fcef10eaf815d90958d49243c5060ffac0 (patch) | |
tree | 258d01fb832a4253815db41673c9fcc73f128e6b /test/test_tls.py | |
parent | 3d7a47c9acb1120f90225c833fd56cffeb99c2cd (diff) | |
download | unit-281899fcef10eaf815d90958d49243c5060ffac0.tar.gz unit-281899fcef10eaf815d90958d49243c5060ffac0.tar.bz2 |
Tests: style.
Diffstat (limited to '')
-rw-r--r-- | test/test_tls.py | 469 |
1 files changed, 307 insertions, 162 deletions
diff --git a/test/test_tls.py b/test/test_tls.py index d9b2e512..8b112f4e 100644 --- a/test/test_tls.py +++ b/test/test_tls.py @@ -5,8 +5,8 @@ import subprocess import unittest import unit -class TestUnitTLS(unit.TestUnitApplicationTLS): +class TestUnitTLS(unit.TestUnitApplicationTLS): def setUpClass(): unit.TestUnit().check_modules('python', 'openssl') @@ -26,17 +26,13 @@ class TestUnitTLS(unit.TestUnitApplicationTLS): return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z') def add_tls(self, application='empty', cert='default', port=7080): - self.conf({ - "application": application, - "tls": { - "certificate": cert - } - }, 'listeners/*:' + str(port)) + self.conf( + {"application": application, "tls": {"certificate": cert}}, + 'listeners/*:' + str(port), + ) def remove_tls(self, application='empty', port=7080): - self.conf({ - "application": application - }, 'listeners/*:' + str(port)) + self.conf({"application": application}, 'listeners/*:' + str(port)) def test_tls_listener_option_add(self): self.load('empty') @@ -65,8 +61,11 @@ class TestUnitTLS(unit.TestUnitApplicationTLS): self.certificate() - self.assertIn('success', self.conf_delete('/certificates/default'), - 'remove certificate') + self.assertIn( + 'success', + self.conf_delete('/certificates/default'), + 'remove certificate', + ) def test_tls_certificate_remove_used(self): self.load('empty') @@ -75,8 +74,11 @@ class TestUnitTLS(unit.TestUnitApplicationTLS): self.add_tls() - self.assertIn('error', self.conf_delete('/certificates/default'), - 'remove certificate') + self.assertIn( + 'error', + self.conf_delete('/certificates/default'), + 'remove certificate', + ) def test_tls_certificate_remove_nonexisting(self): self.load('empty') @@ -85,8 +87,11 @@ class TestUnitTLS(unit.TestUnitApplicationTLS): self.add_tls() - self.assertIn('error', self.conf_delete('/certificates/blah'), - 'remove nonexistings certificate') + self.assertIn( + 'error', + self.conf_delete('/certificates/blah'), + 'remove nonexistings certificate', + ) @unittest.expectedFailure def test_tls_certificate_update(self): @@ -100,8 +105,9 @@ class TestUnitTLS(unit.TestUnitApplicationTLS): self.certificate() - self.assertNotEqual(cert_old, self.get_server_certificate(), - 'update certificate') + self.assertNotEqual( + cert_old, self.get_server_certificate(), 'update certificate' + ) @unittest.expectedFailure def test_tls_certificate_key_incorrect(self): @@ -110,8 +116,9 @@ class TestUnitTLS(unit.TestUnitApplicationTLS): self.certificate('first', False) self.certificate('second', False) - self.assertIn('error', self.certificate_load('first', 'second'), - 'key incorrect') + self.assertIn( + 'error', self.certificate_load('first', 'second'), 'key incorrect' + ) def test_tls_certificate_change(self): self.load('empty') @@ -125,33 +132,53 @@ class TestUnitTLS(unit.TestUnitApplicationTLS): self.add_tls(cert='new') - self.assertNotEqual(cert_old, self.get_server_certificate(), - 'change certificate') + self.assertNotEqual( + cert_old, self.get_server_certificate(), 'change certificate' + ) def test_tls_certificate_key_rsa(self): self.load('empty') self.certificate() - self.assertEqual(self.conf_get('/certificates/default/key'), - 'RSA (1024 bits)', 'certificate key rsa') + self.assertEqual( + self.conf_get('/certificates/default/key'), + 'RSA (1024 bits)', + 'certificate key rsa', + ) def test_tls_certificate_key_ec(self): self.load('empty') - subprocess.call(['openssl', 'ecparam', '-noout', '-genkey', - '-out', self.testdir + '/ec.key', - '-name', 'prime256v1']) - - subprocess.call(['openssl', 'req', '-x509', '-new', - '-config', self.testdir + '/openssl.conf', - '-key', self.testdir + '/ec.key', '-subj', '/CN=ec/', - '-out', self.testdir + '/ec.crt']) + subprocess.call( + [ + 'openssl', + 'ecparam', + '-noout', + '-genkey', + '-out', self.testdir + '/ec.key', + '-name', 'prime256v1', + ] + ) + + subprocess.call( + [ + 'openssl', + 'req', + '-x509', + '-new', + '-subj', '/CN=ec/', + '-config', self.testdir + '/openssl.conf', + '-key', self.testdir + '/ec.key', + '-out', self.testdir + '/ec.crt', + ] + ) self.certificate_load('ec') - self.assertEqual(self.conf_get('/certificates/ec/key'), 'ECDH', - 'certificate key ec') + self.assertEqual( + self.conf_get('/certificates/ec/key'), 'ECDH', 'certificate key ec' + ) def test_tls_certificate_chain_options(self): self.load('empty') @@ -164,36 +191,64 @@ class TestUnitTLS(unit.TestUnitApplicationTLS): cert = chain[0] - self.assertEqual(cert['subject']['common_name'], 'default', - 'certificate subject common name') - self.assertEqual(cert['issuer']['common_name'], 'default', - 'certificate issuer common name') - - self.assertLess(abs(self.sec_epoch() - - self.openssl_date_to_sec_epoch(cert['validity']['since'])), 5, - 'certificate validity since') self.assertEqual( - self.openssl_date_to_sec_epoch(cert['validity']['until']) - - self.openssl_date_to_sec_epoch(cert['validity']['since']), 2592000, - 'certificate validity until') + cert['subject']['common_name'], + 'default', + 'certificate subject common name', + ) + self.assertEqual( + cert['issuer']['common_name'], + 'default', + 'certificate issuer common name', + ) + + self.assertLess( + abs( + self.sec_epoch() + - self.openssl_date_to_sec_epoch(cert['validity']['since']) + ), + 5, + 'certificate validity since', + ) + self.assertEqual( + self.openssl_date_to_sec_epoch(cert['validity']['until']) + - self.openssl_date_to_sec_epoch(cert['validity']['since']), + 2592000, + 'certificate validity until', + ) def test_tls_certificate_chain(self): self.load('empty') self.certificate('root', False) - subprocess.call(['openssl', 'req', '-new', '-config', - self.testdir + '/openssl.conf', '-subj', '/CN=int/', - '-out', self.testdir + '/int.csr', - '-keyout', self.testdir + '/int.key']) - - subprocess.call(['openssl', 'req', '-new', '-config', - self.testdir + '/openssl.conf', '-subj', '/CN=end/', - '-out', self.testdir + '/end.csr', - '-keyout', self.testdir + '/end.key']) + subprocess.call( + [ + 'openssl', + 'req', + '-new', + '-subj', '/CN=int/', + '-config', self.testdir + '/openssl.conf', + '-out', self.testdir + '/int.csr', + '-keyout', self.testdir + '/int.key', + ] + ) + + subprocess.call( + [ + 'openssl', + 'req', + '-new', + '-subj', '/CN=end/', + '-config', self.testdir + '/openssl.conf', + '-out', self.testdir + '/end.csr', + '-keyout', self.testdir + '/end.key', + ] + ) with open(self.testdir + '/ca.conf', 'w') as f: - f.write("""[ ca ] + f.write( + """[ ca ] default_ca = myca [ myca ] @@ -209,11 +264,13 @@ x509_extensions = myca_extensions commonName = supplied [ myca_extensions ] -basicConstraints = critical,CA:TRUE""" % { - 'dir': self.testdir, - 'database': self.testdir + '/certindex', - 'certserial': self.testdir + '/certserial' - }) +basicConstraints = critical,CA:TRUE""" + % { + 'dir': self.testdir, + 'database': self.testdir + '/certindex', + 'certserial': self.testdir + '/certserial', + } + ) with open(self.testdir + '/certserial', 'w') as f: f.write('1000') @@ -221,26 +278,42 @@ basicConstraints = critical,CA:TRUE""" % { with open(self.testdir + '/certindex', 'w') as f: f.write('') - subprocess.call(['openssl', 'ca', '-batch', - '-config', self.testdir + '/ca.conf', - '-keyfile', self.testdir + '/root.key', - '-cert', self.testdir + '/root.crt', - '-subj', '/CN=int/', - '-in', self.testdir + '/int.csr', - '-out', self.testdir + '/int.crt']) - - subprocess.call(['openssl', 'ca', '-batch', - '-config', self.testdir + '/ca.conf', - '-keyfile', self.testdir + '/int.key', - '-cert', self.testdir + '/int.crt', - '-subj', '/CN=end/', - '-in', self.testdir + '/end.csr', - '-out', self.testdir + '/end.crt']) - - with open(self.testdir + '/end-int.crt', 'wb') as crt, \ - open(self.testdir + '/end.crt', 'rb') as end, \ - open(self.testdir + '/int.crt', 'rb') as int: - crt.write(end.read() + int.read()) + subprocess.call( + [ + 'openssl', + 'ca', + '-batch', + '-subj', '/CN=int/', + '-config', self.testdir + '/ca.conf', + '-keyfile', self.testdir + '/root.key', + '-cert', self.testdir + '/root.crt', + '-in', self.testdir + '/int.csr', + '-out', self.testdir + '/int.crt', + ] + ) + + subprocess.call( + [ + 'openssl', + 'ca', + '-batch', + '-subj', '/CN=end/', + '-config', self.testdir + '/ca.conf', + '-keyfile', self.testdir + '/int.key', + '-cert', self.testdir + '/int.crt', + '-in', self.testdir + '/end.csr', + '-out', self.testdir + '/end.crt', + ] + ) + + crt_path = self.testdir + '/end-int.crt' + end_path = self.testdir + '/end.crt' + int_path = self.testdir + '/int.crt' + + with open(crt_path, 'wb') as crt, \ + open(end_path, 'rb') as end, \ + open(int_path, 'rb') as int: + crt.write(end.read() + int.read()) self.context = ssl.create_default_context() self.context.check_hostname = False @@ -249,15 +322,24 @@ basicConstraints = critical,CA:TRUE""" % { # incomplete chain - self.assertIn('success', self.certificate_load('end', 'end'), - 'certificate chain end upload') + self.assertIn( + 'success', + self.certificate_load('end', 'end'), + 'certificate chain end upload', + ) chain = self.conf_get('/certificates/end/chain') self.assertEqual(len(chain), 1, 'certificate chain end length') - self.assertEqual(chain[0]['subject']['common_name'], 'end', - 'certificate chain end subject common name') - self.assertEqual(chain[0]['issuer']['common_name'], 'int', - 'certificate chain end issuer common name') + self.assertEqual( + chain[0]['subject']['common_name'], + 'end', + 'certificate chain end subject common name', + ) + self.assertEqual( + chain[0]['issuer']['common_name'], + 'int', + 'certificate chain end issuer common name', + ) self.add_tls(cert='end') @@ -270,41 +352,69 @@ basicConstraints = critical,CA:TRUE""" % { # intermediate - self.assertIn('success', self.certificate_load('int', 'int'), - 'certificate chain int upload') + self.assertIn( + 'success', + self.certificate_load('int', 'int'), + 'certificate chain int upload', + ) chain = self.conf_get('/certificates/int/chain') self.assertEqual(len(chain), 1, 'certificate chain int length') - self.assertEqual(chain[0]['subject']['common_name'], 'int', - 'certificate chain int subject common name') - self.assertEqual(chain[0]['issuer']['common_name'], 'root', - 'certificate chain int issuer common name') + self.assertEqual( + chain[0]['subject']['common_name'], + 'int', + 'certificate chain int subject common name', + ) + self.assertEqual( + chain[0]['issuer']['common_name'], + 'root', + 'certificate chain int issuer common name', + ) self.add_tls(cert='int') - self.assertEqual(self.get_ssl()['status'], 200, - 'certificate chain intermediate') + self.assertEqual( + self.get_ssl()['status'], 200, 'certificate chain intermediate' + ) # intermediate server - self.assertIn('success', self.certificate_load('end-int', 'end'), - 'certificate chain end-int upload') + self.assertIn( + 'success', + self.certificate_load('end-int', 'end'), + 'certificate chain end-int upload', + ) chain = self.conf_get('/certificates/end-int/chain') self.assertEqual(len(chain), 2, 'certificate chain end-int length') - self.assertEqual(chain[0]['subject']['common_name'], 'end', - 'certificate chain end-int int subject common name') - self.assertEqual(chain[0]['issuer']['common_name'], 'int', - 'certificate chain end-int int issuer common name') - self.assertEqual(chain[1]['subject']['common_name'], 'int', - 'certificate chain end-int end subject common name') - self.assertEqual(chain[1]['issuer']['common_name'], 'root', - 'certificate chain end-int end issuer common name') + self.assertEqual( + chain[0]['subject']['common_name'], + 'end', + 'certificate chain end-int int subject common name', + ) + self.assertEqual( + chain[0]['issuer']['common_name'], + 'int', + 'certificate chain end-int int issuer common name', + ) + self.assertEqual( + chain[1]['subject']['common_name'], + 'int', + 'certificate chain end-int end subject common name', + ) + self.assertEqual( + chain[1]['issuer']['common_name'], + 'root', + 'certificate chain end-int end issuer common name', + ) self.add_tls(cert='end-int') - self.assertEqual(self.get_ssl()['status'], 200, - 'certificate chain intermediate server') + self.assertEqual( + self.get_ssl()['status'], + 200, + 'certificate chain intermediate server', + ) @unittest.expectedFailure def test_tls_reconfigure(self): @@ -312,19 +422,21 @@ basicConstraints = critical,CA:TRUE""" % { self.certificate() - (resp, sock) = self.get(headers={ - 'Host': 'localhost', - 'Connection': 'keep-alive' - }, start=True) + (resp, sock) = self.get( + headers={'Host': 'localhost', 'Connection': 'keep-alive'}, + start=True, + ) self.assertEqual(resp['status'], 200, 'initial status') self.add_tls() - self.assertEqual(self.get(sock=sock)['status'], 200, - 'reconfigure status') - self.assertEqual(self.get_ssl()['status'], 200, - 'reconfigure tls status') + self.assertEqual( + self.get(sock=sock)['status'], 200, 'reconfigure status' + ) + self.assertEqual( + self.get_ssl()['status'], 200, 'reconfigure tls status' + ) def test_tls_keepalive(self): self.load('mirror') @@ -333,19 +445,27 @@ basicConstraints = critical,CA:TRUE""" % { self.add_tls(application='mirror') - (resp, sock) = self.post_ssl(headers={ - 'Host': 'localhost', - 'Connection': 'keep-alive', - 'Content-Type': 'text/html' - }, start=True, body='0123456789') + (resp, sock) = self.post_ssl( + headers={ + 'Host': 'localhost', + 'Connection': 'keep-alive', + 'Content-Type': 'text/html', + }, + start=True, + body='0123456789', + ) self.assertEqual(resp['body'], '0123456789', 'keepalive 1') - resp = self.post_ssl(headers={ - 'Host': 'localhost', - 'Connection': 'close', - 'Content-Type': 'text/html' - }, sock=sock, body='0123456789') + resp = self.post_ssl( + headers={ + 'Host': 'localhost', + 'Connection': 'close', + 'Content-Type': 'text/html', + }, + sock=sock, + body='0123456789', + ) self.assertEqual(resp['body'], '0123456789', 'keepalive 2') @@ -357,21 +477,18 @@ basicConstraints = critical,CA:TRUE""" % { self.add_tls() - (resp, sock) = self.get_ssl(headers={ - 'Host': 'localhost', - 'Connection': 'keep-alive' - }, start=True) + (resp, sock) = self.get_ssl( + headers={'Host': 'localhost', 'Connection': 'keep-alive'}, + start=True, + ) - self.conf({ - "application": "empty" - }, 'listeners/*:7080') + self.conf({"application": "empty"}, 'listeners/*:7080') self.conf_delete('/certificates/default') try: - resp = self.get_ssl(headers={ - 'Host': 'localhost', - 'Connection': 'close' - }, sock=sock) + resp = self.get_ssl( + headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock + ) except: resp = None @@ -383,8 +500,11 @@ basicConstraints = critical,CA:TRUE""" % { self.certificate() - self.assertIn('success', self.conf_delete('/certificates'), - 'remove all certificates') + self.assertIn( + 'success', + self.conf_delete('/certificates'), + 'remove all certificates', + ) def test_tls_application_respawn(self): self.skip_alerts.append(r'process \d+ exited on signal 9') @@ -396,48 +516,73 @@ basicConstraints = critical,CA:TRUE""" % { self.add_tls(application='mirror') - (resp, sock) = self.post_ssl(headers={ - 'Host': 'localhost', - 'Connection': 'keep-alive', - 'Content-Type': 'text/html' - }, start=True, body='0123456789') + (resp, sock) = self.post_ssl( + headers={ + 'Host': 'localhost', + 'Connection': 'keep-alive', + 'Content-Type': 'text/html', + }, + start=True, + body='0123456789', + ) app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0] subprocess.call(['kill', '-9', app_id]) - self.wait_for_record(re.compile(' (?!' + app_id + - '#)(\d+)#\d+ "mirror" application started')) + self.wait_for_record( + re.compile( + ' (?!' + app_id + '#)(\d+)#\d+ "mirror" application started' + ) + ) - resp = self.post_ssl(headers={ - 'Host': 'localhost', - 'Connection': 'close', - 'Content-Type': 'text/html' - }, sock=sock, body='0123456789') + resp = self.post_ssl( + headers={ + 'Host': 'localhost', + 'Connection': 'close', + 'Content-Type': 'text/html', + }, + sock=sock, + body='0123456789', + ) self.assertEqual(resp['status'], 200, 'application respawn status') - self.assertEqual(resp['body'], '0123456789', 'application respawn body') + self.assertEqual( + resp['body'], '0123456789', 'application respawn body' + ) def test_tls_url_scheme(self): self.load('variables') - self.assertEqual(self.post(headers={ - 'Host': 'localhost', - 'Content-Type': 'text/html', - 'Custom-Header': '', - 'Connection': 'close' - })['headers']['Wsgi-Url-Scheme'], 'http', 'url scheme http') + self.assertEqual( + self.post( + headers={ + 'Host': 'localhost', + 'Content-Type': 'text/html', + 'Custom-Header': '', + 'Connection': 'close', + } + )['headers']['Wsgi-Url-Scheme'], + 'http', + 'url scheme http', + ) self.certificate() self.add_tls(application='variables') - self.assertEqual(self.post_ssl(headers={ - 'Host': 'localhost', - 'Content-Type': 'text/html', - 'Custom-Header': '', - 'Connection': 'close' - })['headers']['Wsgi-Url-Scheme'], 'https', 'url scheme https') + self.assertEqual( + self.post_ssl( + headers={ + 'Host': 'localhost', + 'Content-Type': 'text/html', + 'Custom-Header': '', + 'Connection': 'close', + } + )['headers']['Wsgi-Url-Scheme'], + 'https', + 'url scheme https', + ) if __name__ == '__main__': TestUnitTLS.main() |