summaryrefslogblamecommitdiffhomepage
path: root/test/test_tls.py
blob: d9dcf237e9242e9a3bb6475dcf413eab65720dbd (plain) (tree)
1
2
3
4
5
6
7
8
9

         

          

                 
                                                    
 
 
                                  
                                                      




                                                                         



                                                                      
                  



                                                      

                                       

                                                         


                                                                               



























                                                                              




                                                      







                                               




                                                      







                                                      




                                                   
 
                             










                                                


                                                                         
 
                             





                                                 


                                                                              












                                                


                                                                         





                                           

                                                       
                              

                                  

                                          

                          

                           







                                                   

                                     











                                                           

                                     
         


                                   


                                                                               











                                                                   
                         























                                                                           





                                         








                                                           

                                     










                                                           

                                     
         

                                                       

                         




                       
                   








                                 






                                                               






                                                          










                                                        

                                     












                                                       

                                     









                                                







                                                                      




                                                


                                                                       









                                                        











                                                                          




                                                


                                                                       









                                                        


                                


                                                                           


                             




                                                    


                                                                           



















                                                                


                                    




                                                    
 
                             


                                   

                                                           

                          


                                                                      
                           
         
 
                                                               
 
                      
 





                                                                    



                                 

                                                           



                                          







                                            
                           
         


                                                                   








                                            


                                                                   
                             


                                                    

                                                           



                          


                                                                      
                           
         
 
                                                                     


                                                 


                                                                               




                                                                    
                             




                                               




                                              

                                           







                                                       
                                  






                                            
                           
         




                                                                           

                                                                          




                                                                            
 








                                            

                                                                           


                                                                  
 


                                  











                                                




                                             











                                                
 



















                                                         
                          
                  
import io
import os
import re
import ssl
import subprocess
import unittest
from unit.applications.tls import TestApplicationTLS


class TestTLS(TestApplicationTLS):
    prerequisites = {'modules': ['python', 'openssl']}

    def findall(self, pattern):
        with open(self.testdir + '/unit.log', 'r', errors='ignore') as f:
            return re.findall(pattern, f.read())

    def openssl_date_to_sec_epoch(self, date):
        return self.date_to_sec_epoch(date, '%b %d %H:%M:%S %Y %Z')

    def add_tls(self, application='empty', cert='default', port=7080):
        self.conf(
            {
                "pass": "applications/" + application,
                "tls": {"certificate": cert}
            },
            'listeners/*:' + str(port),
        )

    def remove_tls(self, application='empty', port=7080):
        self.conf(
            {"pass": "applications/" + application}, 'listeners/*:' + str(port)
        )

    def test_tls_listener_option_add(self):
        self.load('empty')

        self.certificate()

        self.add_tls()

        self.assertEqual(self.get_ssl()['status'], 200, 'add listener option')

    def test_tls_listener_option_remove(self):
        self.load('empty')

        self.certificate()

        self.add_tls()

        self.get_ssl()

        self.remove_tls()

        self.assertEqual(self.get()['status'], 200, 'remove listener option')

    def test_tls_certificate_remove(self):
        self.load('empty')

        self.certificate()

        self.assertIn(
            'success',
            self.conf_delete('/certificates/default'),
            'remove certificate',
        )

    def test_tls_certificate_remove_used(self):
        self.load('empty')

        self.certificate()

        self.add_tls()

        self.assertIn(
            'error',
            self.conf_delete('/certificates/default'),
            'remove certificate',
        )

    def test_tls_certificate_remove_nonexisting(self):
        self.load('empty')

        self.certificate()

        self.add_tls()

        self.assertIn(
            'error',
            self.conf_delete('/certificates/blah'),
            'remove nonexistings certificate',
        )

    @unittest.skip('not yet')
    def test_tls_certificate_update(self):
        self.load('empty')

        self.certificate()

        self.add_tls()

        cert_old = self.get_server_certificate()

        self.certificate()

        self.assertNotEqual(
            cert_old, self.get_server_certificate(), 'update certificate'
        )

    @unittest.skip('not yet')
    def test_tls_certificate_key_incorrect(self):
        self.load('empty')

        self.certificate('first', False)
        self.certificate('second', False)

        self.assertIn(
            'error', self.certificate_load('first', 'second'), 'key incorrect'
        )

    def test_tls_certificate_change(self):
        self.load('empty')

        self.certificate()
        self.certificate('new')

        self.add_tls()

        cert_old = self.get_server_certificate()

        self.add_tls(cert='new')

        self.assertNotEqual(
            cert_old, self.get_server_certificate(), 'change certificate'
        )

    def test_tls_certificate_key_rsa(self):
        self.load('empty')

        self.certificate()

        self.assertEqual(
            self.conf_get('/certificates/default/key'),
            'RSA (2048 bits)',
            'certificate key rsa',
        )

    def test_tls_certificate_key_ec(self):
        self.load('empty')

        self.openssl_conf()

        subprocess.call(
            [
                'openssl',
                'ecparam',
                '-noout',
                '-genkey',
                '-out',   self.testdir + '/ec.key',
                '-name',  'prime256v1',
            ],
            stderr=subprocess.STDOUT,
        )

        subprocess.call(
            [
                'openssl',
                'req',
                '-x509',
                '-new',
                '-subj',    '/CN=ec/',
                '-config',  self.testdir + '/openssl.conf',
                '-key',     self.testdir + '/ec.key',
                '-out',     self.testdir + '/ec.crt',
            ],
            stderr=subprocess.STDOUT,
        )

        self.certificate_load('ec')

        self.assertEqual(
            self.conf_get('/certificates/ec/key'), 'ECDH', 'certificate key ec'
        )

    def test_tls_certificate_chain_options(self):
        self.load('empty')

        self.certificate()

        chain = self.conf_get('/certificates/default/chain')

        self.assertEqual(len(chain), 1, 'certificate chain length')

        cert = chain[0]

        self.assertEqual(
            cert['subject']['common_name'],
            'default',
            'certificate subject common name',
        )
        self.assertEqual(
            cert['issuer']['common_name'],
            'default',
            'certificate issuer common name',
        )

        self.assertLess(
            abs(
                self.sec_epoch()
                - self.openssl_date_to_sec_epoch(cert['validity']['since'])
            ),
            5,
            'certificate validity since',
        )
        self.assertEqual(
            self.openssl_date_to_sec_epoch(cert['validity']['until'])
            - self.openssl_date_to_sec_epoch(cert['validity']['since']),
            2592000,
            'certificate validity until',
        )

    def test_tls_certificate_chain(self):
        self.load('empty')

        self.certificate('root', False)

        subprocess.call(
            [
                'openssl',
                'req',
                '-new',
                '-subj',    '/CN=int/',
                '-config',  self.testdir + '/openssl.conf',
                '-out',     self.testdir + '/int.csr',
                '-keyout',  self.testdir + '/int.key',
            ],
            stderr=subprocess.STDOUT,
        )

        subprocess.call(
            [
                'openssl',
                'req',
                '-new',
                '-subj',    '/CN=end/',
                '-config',  self.testdir + '/openssl.conf',
                '-out',     self.testdir + '/end.csr',
                '-keyout',  self.testdir + '/end.key',
            ],
            stderr=subprocess.STDOUT,
        )

        with open(self.testdir + '/ca.conf', 'w') as f:
            f.write(
                """[ ca ]
default_ca = myca

[ myca ]
new_certs_dir = %(dir)s
database = %(database)s
default_md = sha256
policy = myca_policy
serial = %(certserial)s
default_days = 1
x509_extensions = myca_extensions

[ myca_policy ]
commonName = supplied

[ myca_extensions ]
basicConstraints = critical,CA:TRUE"""
                % {
                    'dir': self.testdir,
                    'database': self.testdir + '/certindex',
                    'certserial': self.testdir + '/certserial',
                }
            )

        with open(self.testdir + '/certserial', 'w') as f:
            f.write('1000')

        with open(self.testdir + '/certindex', 'w') as f:
            f.write('')

        subprocess.call(
            [
                'openssl',
                'ca',
                '-batch',
                '-subj',     '/CN=int/',
                '-config',   self.testdir + '/ca.conf',
                '-keyfile',  self.testdir + '/root.key',
                '-cert',     self.testdir + '/root.crt',
                '-in',       self.testdir + '/int.csr',
                '-out',      self.testdir + '/int.crt',
            ],
            stderr=subprocess.STDOUT,
        )

        subprocess.call(
            [
                'openssl',
                'ca',
                '-batch',
                '-subj',     '/CN=end/',
                '-config',   self.testdir + '/ca.conf',
                '-keyfile',  self.testdir + '/int.key',
                '-cert',     self.testdir + '/int.crt',
                '-in',       self.testdir + '/end.csr',
                '-out',      self.testdir + '/end.crt',
            ],
            stderr=subprocess.STDOUT,
        )

        crt_path = self.testdir + '/end-int.crt'
        end_path = self.testdir + '/end.crt'
        int_path = self.testdir + '/int.crt'

        with open(crt_path, 'wb') as crt, \
             open(end_path, 'rb') as end, \
             open(int_path, 'rb') as int:
            crt.write(end.read() + int.read())

        self.context = ssl.create_default_context()
        self.context.check_hostname = False
        self.context.verify_mode = ssl.CERT_REQUIRED
        self.context.load_verify_locations(self.testdir + '/root.crt')

        # incomplete chain

        self.assertIn(
            'success',
            self.certificate_load('end', 'end'),
            'certificate chain end upload',
        )

        chain = self.conf_get('/certificates/end/chain')
        self.assertEqual(len(chain), 1, 'certificate chain end length')
        self.assertEqual(
            chain[0]['subject']['common_name'],
            'end',
            'certificate chain end subject common name',
        )
        self.assertEqual(
            chain[0]['issuer']['common_name'],
            'int',
            'certificate chain end issuer common name',
        )

        self.add_tls(cert='end')

        try:
            resp = self.get_ssl()
        except ssl.SSLError:
            resp = None

        self.assertEqual(resp, None, 'certificate chain incomplete chain')

        # intermediate

        self.assertIn(
            'success',
            self.certificate_load('int', 'int'),
            'certificate chain int upload',
        )

        chain = self.conf_get('/certificates/int/chain')
        self.assertEqual(len(chain), 1, 'certificate chain int length')
        self.assertEqual(
            chain[0]['subject']['common_name'],
            'int',
            'certificate chain int subject common name',
        )
        self.assertEqual(
            chain[0]['issuer']['common_name'],
            'root',
            'certificate chain int issuer common name',
        )

        self.add_tls(cert='int')

        self.assertEqual(
            self.get_ssl()['status'], 200, 'certificate chain intermediate'
        )

        # intermediate server

        self.assertIn(
            'success',
            self.certificate_load('end-int', 'end'),
            'certificate chain end-int upload',
        )

        chain = self.conf_get('/certificates/end-int/chain')
        self.assertEqual(len(chain), 2, 'certificate chain end-int length')
        self.assertEqual(
            chain[0]['subject']['common_name'],
            'end',
            'certificate chain end-int int subject common name',
        )
        self.assertEqual(
            chain[0]['issuer']['common_name'],
            'int',
            'certificate chain end-int int issuer common name',
        )
        self.assertEqual(
            chain[1]['subject']['common_name'],
            'int',
            'certificate chain end-int end subject common name',
        )
        self.assertEqual(
            chain[1]['issuer']['common_name'],
            'root',
            'certificate chain end-int end issuer common name',
        )

        self.add_tls(cert='end-int')

        self.assertEqual(
            self.get_ssl()['status'],
            200,
            'certificate chain intermediate server',
        )

    @unittest.skip('not yet')
    def test_tls_reconfigure(self):
        self.load('empty')

        self.assertEqual(self.get()['status'], 200, 'init')

        self.certificate()

        (resp, sock) = self.get(
            headers={'Host': 'localhost', 'Connection': 'keep-alive'},
            start=True,
            read_timeout=1,
        )

        self.assertEqual(resp['status'], 200, 'initial status')

        self.add_tls()

        self.assertEqual(
            self.get(sock=sock)['status'], 200, 'reconfigure status'
        )
        self.assertEqual(
            self.get_ssl()['status'], 200, 'reconfigure tls status'
        )

    def test_tls_keepalive(self):
        self.load('mirror')

        self.assertEqual(self.get()['status'], 200, 'init')

        self.certificate()

        self.add_tls(application='mirror')

        (resp, sock) = self.post_ssl(
            headers={
                'Host': 'localhost',
                'Connection': 'keep-alive',
                'Content-Type': 'text/html',
            },
            start=True,
            body='0123456789',
            read_timeout=1,
        )

        self.assertEqual(resp['body'], '0123456789', 'keepalive 1')

        resp = self.post_ssl(
            headers={
                'Host': 'localhost',
                'Connection': 'close',
                'Content-Type': 'text/html',
            },
            sock=sock,
            body='0123456789',
        )

        self.assertEqual(resp['body'], '0123456789', 'keepalive 2')

    @unittest.skip('not yet')
    def test_tls_keepalive_certificate_remove(self):
        self.load('empty')

        self.assertEqual(self.get()['status'], 200, 'init')

        self.certificate()

        self.add_tls()

        (resp, sock) = self.get_ssl(
            headers={'Host': 'localhost', 'Connection': 'keep-alive'},
            start=True,
            read_timeout=1,
        )

        self.conf({"pass": "applications/empty"}, 'listeners/*:7080')
        self.conf_delete('/certificates/default')

        try:
            resp = self.get_ssl(
                headers={'Host': 'localhost', 'Connection': 'close'}, sock=sock
            )
        except:
            resp = None

        self.assertEqual(resp, None, 'keepalive remove certificate')

    @unittest.skip('not yet')
    def test_tls_certificates_remove_all(self):
        self.load('empty')

        self.certificate()

        self.assertIn(
            'success',
            self.conf_delete('/certificates'),
            'remove all certificates',
        )

    def test_tls_application_respawn(self):
        self.load('mirror')

        self.certificate()

        self.conf('1', 'applications/mirror/processes')

        self.add_tls(application='mirror')

        (_, sock) = self.post_ssl(
            headers={
                'Host': 'localhost',
                'Connection': 'keep-alive',
                'Content-Type': 'text/html',
            },
            start=True,
            body='0123456789',
            read_timeout=1,
        )

        app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0]

        subprocess.call(['kill', '-9', app_id])

        self.skip_alerts.append(r'process %s exited on signal 9' % app_id)

        self.wait_for_record(
            re.compile(
                ' (?!' + app_id + '#)(\d+)#\d+ "mirror" application started'
            )
        )

        resp = self.post_ssl(
            headers={
                'Host': 'localhost',
                'Connection': 'close',
                'Content-Type': 'text/html',
            },
            sock=sock,
            body='0123456789',
        )

        self.assertEqual(resp['status'], 200, 'application respawn status')
        self.assertEqual(
            resp['body'], '0123456789', 'application respawn body'
        )

    def test_tls_url_scheme(self):
        self.load('variables')

        self.assertEqual(
            self.post(
                headers={
                    'Host': 'localhost',
                    'Content-Type': 'text/html',
                    'Custom-Header': '',
                    'Connection': 'close',
                }
            )['headers']['Wsgi-Url-Scheme'],
            'http',
            'url scheme http',
        )

        self.certificate()

        self.add_tls(application='variables')

        self.assertEqual(
            self.post_ssl(
                headers={
                    'Host': 'localhost',
                    'Content-Type': 'text/html',
                    'Custom-Header': '',
                    'Connection': 'close',
                }
            )['headers']['Wsgi-Url-Scheme'],
            'https',
            'url scheme https',
        )

    def test_tls_big_upload(self):
        self.load('upload')

        self.certificate()

        self.add_tls(application='upload')

        filename = 'test.txt'
        data = '0123456789' * 9000

        res = self.post_ssl(body={
            'file': {
                'filename': filename,
                'type': 'text/plain',
                'data': io.StringIO(data),
            }
        })
        self.assertEqual(res['status'], 200, 'status ok')
        self.assertEqual(res['body'], filename + data)

if __name__ == '__main__':
    TestTLS.main()