summaryrefslogtreecommitdiffhomepage
path: root/test/test_tls.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_tls.py')
-rw-r--r--test/test_tls.py508
1 files changed, 341 insertions, 167 deletions
diff --git a/test/test_tls.py b/test/test_tls.py
index 2131bf30..f055aa24 100644
--- a/test/test_tls.py
+++ b/test/test_tls.py
@@ -3,40 +3,32 @@ import ssl
import time
import subprocess
import unittest
-import unit
+from unit.applications.tls import TestApplicationTLS
-class TestUnitTLS(unit.TestUnitApplicationTLS):
- def setUpClass():
- unit.TestUnit().check_modules('python', 'openssl')
+class TestTLS(TestApplicationTLS):
+ prerequisites = ['python', 'openssl']
def findall(self, pattern):
with open(self.testdir + '/unit.log', 'r', errors='ignore') as f:
return re.findall(pattern, f.read())
- def wait_for_record(self, pattern):
- for i in range(50):
- with open(self.testdir + '/unit.log', 'r', errors='ignore') as f:
- if re.search(pattern, f.read()) is not None:
- break
-
- time.sleep(0.1)
-
def openssl_date_to_sec_epoch(self, date):
return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z')
def add_tls(self, application='empty', cert='default', port=7080):
- self.conf({
- "application": application,
- "tls": {
- "certificate": cert
- }
- }, 'listeners/*:' + str(port))
+ self.conf(
+ {
+ "pass": "applications/" + application,
+ "tls": {"certificate": cert}
+ },
+ 'listeners/*:' + str(port),
+ )
def remove_tls(self, application='empty', port=7080):
- self.conf({
- "application": application
- }, 'listeners/*:' + str(port))
+ self.conf(
+ {"pass": "applications/" + application}, 'listeners/*:' + str(port)
+ )
def test_tls_listener_option_add(self):
self.load('empty')
@@ -65,8 +57,11 @@ class TestUnitTLS(unit.TestUnitApplicationTLS):
self.certificate()
- self.assertIn('success', self.conf_delete('/certificates/default'),
- 'remove certificate')
+ self.assertIn(
+ 'success',
+ self.conf_delete('/certificates/default'),
+ 'remove certificate',
+ )
def test_tls_certificate_remove_used(self):
self.load('empty')
@@ -75,8 +70,11 @@ class TestUnitTLS(unit.TestUnitApplicationTLS):
self.add_tls()
- self.assertIn('error', self.conf_delete('/certificates/default'),
- 'remove certificate')
+ self.assertIn(
+ 'error',
+ self.conf_delete('/certificates/default'),
+ 'remove certificate',
+ )
def test_tls_certificate_remove_nonexisting(self):
self.load('empty')
@@ -85,10 +83,13 @@ class TestUnitTLS(unit.TestUnitApplicationTLS):
self.add_tls()
- self.assertIn('error', self.conf_delete('/certificates/blah'),
- 'remove nonexistings certificate')
+ self.assertIn(
+ 'error',
+ self.conf_delete('/certificates/blah'),
+ 'remove nonexistings certificate',
+ )
- @unittest.expectedFailure
+ @unittest.skip('not yet')
def test_tls_certificate_update(self):
self.load('empty')
@@ -100,18 +101,20 @@ class TestUnitTLS(unit.TestUnitApplicationTLS):
self.certificate()
- self.assertNotEqual(cert_old, self.get_server_certificate(),
- 'update certificate')
+ self.assertNotEqual(
+ cert_old, self.get_server_certificate(), 'update certificate'
+ )
- @unittest.expectedFailure
+ @unittest.skip('not yet')
def test_tls_certificate_key_incorrect(self):
self.load('empty')
self.certificate('first', False)
self.certificate('second', False)
- self.assertIn('error', self.certificate_load('first', 'second'),
- 'key incorrect')
+ self.assertIn(
+ 'error', self.certificate_load('first', 'second'), 'key incorrect'
+ )
def test_tls_certificate_change(self):
self.load('empty')
@@ -125,33 +128,53 @@ class TestUnitTLS(unit.TestUnitApplicationTLS):
self.add_tls(cert='new')
- self.assertNotEqual(cert_old, self.get_server_certificate(),
- 'change certificate')
+ self.assertNotEqual(
+ cert_old, self.get_server_certificate(), 'change certificate'
+ )
def test_tls_certificate_key_rsa(self):
self.load('empty')
self.certificate()
- self.assertEqual(self.conf_get('/certificates/default/key'),
- 'RSA (1024 bits)', 'certificate key rsa')
+ self.assertEqual(
+ self.conf_get('/certificates/default/key'),
+ 'RSA (1024 bits)',
+ 'certificate key rsa',
+ )
def test_tls_certificate_key_ec(self):
self.load('empty')
- subprocess.call(['openssl', 'ecparam', '-noout', '-genkey',
- '-out', self.testdir + '/ec.key',
- '-name', 'prime256v1'])
-
- subprocess.call(['openssl', 'req', '-x509', '-new',
- '-config', self.testdir + '/openssl.conf',
- '-key', self.testdir + '/ec.key', '-subj', '/CN=ec/',
- '-out', self.testdir + '/ec.crt'])
+ subprocess.call(
+ [
+ 'openssl',
+ 'ecparam',
+ '-noout',
+ '-genkey',
+ '-out', self.testdir + '/ec.key',
+ '-name', 'prime256v1',
+ ]
+ )
+
+ subprocess.call(
+ [
+ 'openssl',
+ 'req',
+ '-x509',
+ '-new',
+ '-subj', '/CN=ec/',
+ '-config', self.testdir + '/openssl.conf',
+ '-key', self.testdir + '/ec.key',
+ '-out', self.testdir + '/ec.crt',
+ ]
+ )
self.certificate_load('ec')
- self.assertEqual(self.conf_get('/certificates/ec/key'), 'ECDH',
- 'certificate key ec')
+ self.assertEqual(
+ self.conf_get('/certificates/ec/key'), 'ECDH', 'certificate key ec'
+ )
def test_tls_certificate_chain_options(self):
self.load('empty')
@@ -164,36 +187,64 @@ class TestUnitTLS(unit.TestUnitApplicationTLS):
cert = chain[0]
- self.assertEqual(cert['subject']['common_name'], 'default',
- 'certificate subject common name')
- self.assertEqual(cert['issuer']['common_name'], 'default',
- 'certificate issuer common name')
-
- self.assertLess(abs(self.sec_epoch() -
- self.openssl_date_to_sec_epoch(cert['validity']['since'])), 5,
- 'certificate validity since')
self.assertEqual(
- self.openssl_date_to_sec_epoch(cert['validity']['until']) -
- self.openssl_date_to_sec_epoch(cert['validity']['since']), 2592000,
- 'certificate validity until')
+ cert['subject']['common_name'],
+ 'default',
+ 'certificate subject common name',
+ )
+ self.assertEqual(
+ cert['issuer']['common_name'],
+ 'default',
+ 'certificate issuer common name',
+ )
+
+ self.assertLess(
+ abs(
+ self.sec_epoch()
+ - self.openssl_date_to_sec_epoch(cert['validity']['since'])
+ ),
+ 5,
+ 'certificate validity since',
+ )
+ self.assertEqual(
+ self.openssl_date_to_sec_epoch(cert['validity']['until'])
+ - self.openssl_date_to_sec_epoch(cert['validity']['since']),
+ 2592000,
+ 'certificate validity until',
+ )
def test_tls_certificate_chain(self):
self.load('empty')
self.certificate('root', False)
- subprocess.call(['openssl', 'req', '-new', '-config',
- self.testdir + '/openssl.conf', '-subj', '/CN=int/',
- '-out', self.testdir + '/int.csr',
- '-keyout', self.testdir + '/int.key'])
-
- subprocess.call(['openssl', 'req', '-new', '-config',
- self.testdir + '/openssl.conf', '-subj', '/CN=end/',
- '-out', self.testdir + '/end.csr',
- '-keyout', self.testdir + '/end.key'])
+ subprocess.call(
+ [
+ 'openssl',
+ 'req',
+ '-new',
+ '-subj', '/CN=int/',
+ '-config', self.testdir + '/openssl.conf',
+ '-out', self.testdir + '/int.csr',
+ '-keyout', self.testdir + '/int.key',
+ ]
+ )
+
+ subprocess.call(
+ [
+ 'openssl',
+ 'req',
+ '-new',
+ '-subj', '/CN=end/',
+ '-config', self.testdir + '/openssl.conf',
+ '-out', self.testdir + '/end.csr',
+ '-keyout', self.testdir + '/end.key',
+ ]
+ )
with open(self.testdir + '/ca.conf', 'w') as f:
- f.write("""[ ca ]
+ f.write(
+ """[ ca ]
default_ca = myca
[ myca ]
@@ -209,11 +260,13 @@ x509_extensions = myca_extensions
commonName = supplied
[ myca_extensions ]
-basicConstraints = critical,CA:TRUE""" % {
- 'dir': self.testdir,
- 'database': self.testdir + '/certindex',
- 'certserial': self.testdir + '/certserial'
- })
+basicConstraints = critical,CA:TRUE"""
+ % {
+ 'dir': self.testdir,
+ 'database': self.testdir + '/certindex',
+ 'certserial': self.testdir + '/certserial',
+ }
+ )
with open(self.testdir + '/certserial', 'w') as f:
f.write('1000')
@@ -221,26 +274,42 @@ basicConstraints = critical,CA:TRUE""" % {
with open(self.testdir + '/certindex', 'w') as f:
f.write('')
- subprocess.call(['openssl', 'ca', '-batch',
- '-config', self.testdir + '/ca.conf',
- '-keyfile', self.testdir + '/root.key',
- '-cert', self.testdir + '/root.crt',
- '-subj', '/CN=int/',
- '-in', self.testdir + '/int.csr',
- '-out', self.testdir + '/int.crt'])
-
- subprocess.call(['openssl', 'ca', '-batch',
- '-config', self.testdir + '/ca.conf',
- '-keyfile', self.testdir + '/int.key',
- '-cert', self.testdir + '/int.crt',
- '-subj', '/CN=end/',
- '-in', self.testdir + '/end.csr',
- '-out', self.testdir + '/end.crt'])
-
- with open(self.testdir + '/end-int.crt', 'wb') as crt, \
- open(self.testdir + '/end.crt', 'rb') as end, \
- open(self.testdir + '/int.crt', 'rb') as int:
- crt.write(end.read() + int.read())
+ subprocess.call(
+ [
+ 'openssl',
+ 'ca',
+ '-batch',
+ '-subj', '/CN=int/',
+ '-config', self.testdir + '/ca.conf',
+ '-keyfile', self.testdir + '/root.key',
+ '-cert', self.testdir + '/root.crt',
+ '-in', self.testdir + '/int.csr',
+ '-out', self.testdir + '/int.crt',
+ ]
+ )
+
+ subprocess.call(
+ [
+ 'openssl',
+ 'ca',
+ '-batch',
+ '-subj', '/CN=end/',
+ '-config', self.testdir + '/ca.conf',
+ '-keyfile', self.testdir + '/int.key',
+ '-cert', self.testdir + '/int.crt',
+ '-in', self.testdir + '/end.csr',
+ '-out', self.testdir + '/end.crt',
+ ]
+ )
+
+ crt_path = self.testdir + '/end-int.crt'
+ end_path = self.testdir + '/end.crt'
+ int_path = self.testdir + '/int.crt'
+
+ with open(crt_path, 'wb') as crt, \
+ open(end_path, 'rb') as end, \
+ open(int_path, 'rb') as int:
+ crt.write(end.read() + int.read())
self.context = ssl.create_default_context()
self.context.check_hostname = False
@@ -249,15 +318,24 @@ basicConstraints = critical,CA:TRUE""" % {
# incomplete chain
- self.assertIn('success', self.certificate_load('end', 'end'),
- 'certificate chain end upload')
+ self.assertIn(
+ 'success',
+ self.certificate_load('end', 'end'),
+ 'certificate chain end upload',
+ )
chain = self.conf_get('/certificates/end/chain')
self.assertEqual(len(chain), 1, 'certificate chain end length')
- self.assertEqual(chain[0]['subject']['common_name'], 'end',
- 'certificate chain end subject common name')
- self.assertEqual(chain[0]['issuer']['common_name'], 'int',
- 'certificate chain end issuer common name')
+ self.assertEqual(
+ chain[0]['subject']['common_name'],
+ 'end',
+ 'certificate chain end subject common name',
+ )
+ self.assertEqual(
+ chain[0]['issuer']['common_name'],
+ 'int',
+ 'certificate chain end issuer common name',
+ )
self.add_tls(cert='end')
@@ -270,153 +348,249 @@ basicConstraints = critical,CA:TRUE""" % {
# intermediate
- self.assertIn('success', self.certificate_load('int', 'int'),
- 'certificate chain int upload')
+ self.assertIn(
+ 'success',
+ self.certificate_load('int', 'int'),
+ 'certificate chain int upload',
+ )
chain = self.conf_get('/certificates/int/chain')
self.assertEqual(len(chain), 1, 'certificate chain int length')
- self.assertEqual(chain[0]['subject']['common_name'], 'int',
- 'certificate chain int subject common name')
- self.assertEqual(chain[0]['issuer']['common_name'], 'root',
- 'certificate chain int issuer common name')
+ self.assertEqual(
+ chain[0]['subject']['common_name'],
+ 'int',
+ 'certificate chain int subject common name',
+ )
+ self.assertEqual(
+ chain[0]['issuer']['common_name'],
+ 'root',
+ 'certificate chain int issuer common name',
+ )
self.add_tls(cert='int')
- self.assertEqual(self.get_ssl()['status'], 200,
- 'certificate chain intermediate')
+ self.assertEqual(
+ self.get_ssl()['status'], 200, 'certificate chain intermediate'
+ )
# intermediate server
- self.assertIn('success', self.certificate_load('end-int', 'end'),
- 'certificate chain end-int upload')
+ self.assertIn(
+ 'success',
+ self.certificate_load('end-int', 'end'),
+ 'certificate chain end-int upload',
+ )
chain = self.conf_get('/certificates/end-int/chain')
self.assertEqual(len(chain), 2, 'certificate chain end-int length')
- self.assertEqual(chain[0]['subject']['common_name'], 'end',
- 'certificate chain end-int int subject common name')
- self.assertEqual(chain[0]['issuer']['common_name'], 'int',
- 'certificate chain end-int int issuer common name')
- self.assertEqual(chain[1]['subject']['common_name'], 'int',
- 'certificate chain end-int end subject common name')
- self.assertEqual(chain[1]['issuer']['common_name'], 'root',
- 'certificate chain end-int end issuer common name')
+ self.assertEqual(
+ chain[0]['subject']['common_name'],
+ 'end',
+ 'certificate chain end-int int subject common name',
+ )
+ self.assertEqual(
+ chain[0]['issuer']['common_name'],
+ 'int',
+ 'certificate chain end-int int issuer common name',
+ )
+ self.assertEqual(
+ chain[1]['subject']['common_name'],
+ 'int',
+ 'certificate chain end-int end subject common name',
+ )
+ self.assertEqual(
+ chain[1]['issuer']['common_name'],
+ 'root',
+ 'certificate chain end-int end issuer common name',
+ )
self.add_tls(cert='end-int')
- self.assertEqual(self.get_ssl()['status'], 200,
- 'certificate chain intermediate server')
+ self.assertEqual(
+ self.get_ssl()['status'],
+ 200,
+ 'certificate chain intermediate server',
+ )
- @unittest.expectedFailure
+ @unittest.skip('not yet')
def test_tls_reconfigure(self):
self.load('empty')
+ self.assertEqual(self.get()['status'], 200, 'init')
+
self.certificate()
- (resp, sock) = self.get(headers={
- 'Host': 'localhost',
- 'Connection': 'keep-alive'
- }, start=True)
+ (resp, sock) = self.get(
+ headers={'Host': 'localhost', 'Connection': 'keep-alive'},
+ start=True,
+ read_timeout=1,
+ )
self.assertEqual(resp['status'], 200, 'initial status')
self.add_tls()
- self.assertEqual(self.get(sock=sock)['status'], 200,
- 'reconfigure status')
- self.assertEqual(self.get_ssl()['status'], 200,
- 'reconfigure tls status')
+ self.assertEqual(
+ self.get(sock=sock)['status'], 200, 'reconfigure status'
+ )
+ self.assertEqual(
+ self.get_ssl()['status'], 200, 'reconfigure tls status'
+ )
def test_tls_keepalive(self):
self.load('mirror')
+ self.assertEqual(self.get()['status'], 200, 'init')
+
self.certificate()
self.add_tls(application='mirror')
- (resp, sock) = self.post_ssl(headers={
- 'Host': 'localhost',
- 'Connection': 'keep-alive',
- 'Content-Type': 'text/html'
- }, start=True, body='0123456789')
+ (resp, sock) = self.post_ssl(
+ headers={
+ 'Host': 'localhost',
+ 'Connection': 'keep-alive',
+ 'Content-Type': 'text/html',
+ },
+ start=True,
+ body='0123456789',
+ read_timeout=1,
+ )
self.assertEqual(resp['body'], '0123456789', 'keepalive 1')
- resp = self.post_ssl(headers={
- 'Host': 'localhost',
- 'Connection': 'close',
- 'Content-Type': 'text/html'
- }, sock=sock, body='0123456789')
+ resp = self.post_ssl(
+ headers={
+ 'Host': 'localhost',
+ 'Connection': 'close',
+ 'Content-Type': 'text/html',
+ },
+ sock=sock,
+ body='0123456789',
+ )
self.assertEqual(resp['body'], '0123456789', 'keepalive 2')
- @unittest.expectedFailure
+ @unittest.skip('not yet')
def test_tls_keepalive_certificate_remove(self):
self.load('empty')
+ self.assertEqual(self.get()['status'], 200, 'init')
+
self.certificate()
self.add_tls()
- (resp, sock) = self.get_ssl(headers={
- 'Host': 'localhost',
- 'Connection': 'keep-alive'
- }, start=True)
+ (resp, sock) = self.get_ssl(
+ headers={'Host': 'localhost', 'Connection': 'keep-alive'},
+ start=True,
+ read_timeout=1,
+ )
- self.conf({
- "application": "empty"
- }, 'listeners/*:7080')
+ self.conf({"pass": "applications/empty"}, 'listeners/*:7080')
self.conf_delete('/certificates/default')
try:
- resp = self.get_ssl(headers={
- 'Host': 'localhost',
- 'Connection': 'close'
- }, sock=sock)
+ resp = self.get_ssl(
+ headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock
+ )
except:
resp = None
self.assertEqual(resp, None, 'keepalive remove certificate')
- @unittest.expectedFailure
+ @unittest.skip('not yet')
def test_tls_certificates_remove_all(self):
self.load('empty')
self.certificate()
- self.assertIn('success', self.conf_delete('/certificates'),
- 'remove all certificates')
+ self.assertIn(
+ 'success',
+ self.conf_delete('/certificates'),
+ 'remove all certificates',
+ )
def test_tls_application_respawn(self):
self.skip_alerts.append(r'process \d+ exited on signal 9')
self.load('mirror')
+ self.assertEqual(self.get()['status'], 200, 'init')
+
self.certificate()
self.conf('1', 'applications/mirror/processes')
self.add_tls(application='mirror')
- (resp, sock) = self.post_ssl(headers={
- 'Host': 'localhost',
- 'Connection': 'keep-alive',
- 'Content-Type': 'text/html'
- }, start=True, body='0123456789')
+ (resp, sock) = self.post_ssl(
+ headers={
+ 'Host': 'localhost',
+ 'Connection': 'keep-alive',
+ 'Content-Type': 'text/html',
+ },
+ start=True,
+ body='0123456789',
+ read_timeout=1,
+ )
app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0]
subprocess.call(['kill', '-9', app_id])
- self.wait_for_record(re.compile(' (?!' + app_id +
- '#)(\d+)#\d+ "mirror" application started'))
+ self.wait_for_record(
+ re.compile(
+ ' (?!' + app_id + '#)(\d+)#\d+ "mirror" application started'
+ )
+ )
- resp = self.post_ssl(headers={
- 'Host': 'localhost',
- 'Connection': 'close',
- 'Content-Type': 'text/html'
- }, sock=sock, body='0123456789')
+ resp = self.post_ssl(
+ headers={
+ 'Host': 'localhost',
+ 'Connection': 'close',
+ 'Content-Type': 'text/html',
+ },
+ sock=sock,
+ body='0123456789',
+ )
self.assertEqual(resp['status'], 200, 'application respawn status')
- self.assertEqual(resp['body'], '0123456789', 'application respawn body')
+ self.assertEqual(
+ resp['body'], '0123456789', 'application respawn body'
+ )
+
+ def test_tls_url_scheme(self):
+ self.load('variables')
+
+ self.assertEqual(
+ self.post(
+ headers={
+ 'Host': 'localhost',
+ 'Content-Type': 'text/html',
+ 'Custom-Header': '',
+ 'Connection': 'close',
+ }
+ )['headers']['Wsgi-Url-Scheme'],
+ 'http',
+ 'url scheme http',
+ )
+
+ self.certificate()
+
+ self.add_tls(application='variables')
+
+ self.assertEqual(
+ self.post_ssl(
+ headers={
+ 'Host': 'localhost',
+ 'Content-Type': 'text/html',
+ 'Custom-Header': '',
+ 'Connection': 'close',
+ }
+ )['headers']['Wsgi-Url-Scheme'],
+ 'https',
+ 'url scheme https',
+ )
if __name__ == '__main__':
- TestUnitTLS.main()
+ TestTLS.main()