summaryrefslogblamecommitdiffhomepage
path: root/test/test_tls.py
blob: 06c38d0b73a065e4c6c427625ed88a6774cac654 (plain) (tree)
1
2
3
4
5
6
7
8
9
         
          
                 
           
 
             
                                                    
                              
 
 
                                  
                                                                    
 
                                              
                                                             

                                                                      
                                      
             
                                                      
                                             
              
                                  
         

                                                         
                                      
                                                                          
         
 
                                                              
                                                                 
 
                                






                          
                                                  
                       
                                                
                          
                                                




                                     
                                                          
                    
                          


                 

                                      

                    
                                     








                                      

             
                                                             

                           
                                                            

                       
                                                                 


                                               
                                




                          
                                             
                           
                                                
                        
                                                
                      
                                               
                       
                                               







                                                       
                                                                           
 






                                           
                                                                     











                                              
                                                                    





                                          


                                             







                                               


                                           







                                                      


                                            
 
                                






                                          
                                                                  


                          


                                                      
 
                                





                                                 


                                                








                                          
                                                                  


                                


                                                      





                                           


                                                                           
 
                                                    

                          

                           
                                




                          
                       
                                     

                             

                                     

         
                                




                          


                          
                                           
                       
                                     
                       
                                     

                                     
         


                                   


                                                           







                                                            
                                                          


                       





                                                       
 
                


                                                                           
             
                

                                       
                                                                     


                                                                       
 
                                                   



                                       

                       
 
                               
 

                                       
 


                                            
 


                                                                            
                                              
 
                                          


                          


                                                  

                                                        






                                                              







                                 
                                                                 


                      


                                                  

                                                        






                                                              


                                
                                                                                


                             


                                                  

                                                            












                                                                  


                                    


                                                  
 













                                                        
                                   




                                                
                                                                   
                 
                                                            

                                                



                                             

             
                                                                               














                                                                         





































                                                                            






















                                                                            


                                 
                                                  
 



                                          



                                           


                              
                           
         
 
                                                          
 



                                      



                              
 
                                                          
 





















                                                          
                                


                                                    
                                                  
 



                          


                                                                      
                           
         
 



                                                                     

            
                                          



                                 


                       
                                                           
 
                                




                                               


                                             
 
                                                       



                           
                                                                           


                                          
                                  


                                           


                              
                           
         


                                                                           
                                                       
 
                                                           
 
                             
                                                                    
         
 
                                                          
 

                                                                       
 


                                  
                






                                                


                                           




                                             
                






                                                


                                           
 









                                          






                                              
             

                                                
                                                 











                                                                      
import io
import ssl
import subprocess
import time

import pytest
from unit.applications.tls import TestApplicationTLS
from unit.option import option


class TestTLS(TestApplicationTLS):
    prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}}

    def openssl_date_to_sec_epoch(self, date):
        return self.date_to_sec_epoch(date, '%b %d %X %Y %Z')

    def add_tls(self, application='empty', cert='default', port=7080):
        assert 'success' in self.conf(
            {
                "pass": f"applications/{application}",
                "tls": {"certificate": cert},
            },
            f'listeners/*:{port}',
        )

    def remove_tls(self, application='empty', port=7080):
        assert 'success' in self.conf(
            {"pass": f"applications/{application}"}, f'listeners/*:{port}'
        )

    def req(self, name='localhost', subject=None, x509=False):
        subj = subject if subject is not None else f'/CN={name}/'

        subprocess.check_output(
            [
                'openssl',
                'req',
                '-new',
                '-subj',
                subj,
                '-config',
                f'{option.temp_dir}/openssl.conf',
                '-out',
                f'{option.temp_dir}/{name}.csr',
                '-keyout',
                f'{option.temp_dir}/{name}.key',
            ],
            stderr=subprocess.STDOUT,
        )

    def generate_ca_conf(self):
        with open(f'{option.temp_dir}/ca.conf', 'w') as f:
            f.write(
                f"""[ ca ]
default_ca = myca

[ myca ]
new_certs_dir = {option.temp_dir}
database = {option.temp_dir}/certindex
default_md = sha256
policy = myca_policy
serial = {option.temp_dir}/certserial
default_days = 1
x509_extensions = myca_extensions
copy_extensions = copy

[ myca_policy ]
commonName = optional

[ myca_extensions ]
basicConstraints = critical,CA:TRUE"""
            )

        with open(f'{option.temp_dir}/certserial', 'w') as f:
            f.write('1000')

        with open(f'{option.temp_dir}/certindex', 'w') as f:
            f.write('')

        with open(f'{option.temp_dir}/certindex.attr', 'w') as f:
            f.write('')

    def ca(self, cert='root', out='localhost'):
        subprocess.check_output(
            [
                'openssl',
                'ca',
                '-batch',
                '-config',
                f'{option.temp_dir}/ca.conf',
                '-keyfile',
                f'{option.temp_dir}/{cert}.key',
                '-cert',
                f'{option.temp_dir}/{cert}.crt',
                '-in',
                f'{option.temp_dir}/{out}.csr',
                '-out',
                f'{option.temp_dir}/{out}.crt',
            ],
            stderr=subprocess.STDOUT,
        )

    def set_certificate_req_context(self, cert='root'):
        self.context = ssl.create_default_context()
        self.context.check_hostname = False
        self.context.verify_mode = ssl.CERT_REQUIRED
        self.context.load_verify_locations(f'{option.temp_dir}/{cert}.crt')

    def test_tls_listener_option_add(self):
        self.load('empty')

        self.certificate()

        self.add_tls()

        assert self.get_ssl()['status'] == 200, 'add listener option'

    def test_tls_listener_option_remove(self):
        self.load('empty')

        self.certificate()

        self.add_tls()

        self.get_ssl()

        self.remove_tls()

        assert self.get()['status'] == 200, 'remove listener option'

    def test_tls_certificate_remove(self):
        self.load('empty')

        self.certificate()

        assert 'success' in self.conf_delete(
            '/certificates/default'
        ), 'remove certificate'

    def test_tls_certificate_remove_used(self):
        self.load('empty')

        self.certificate()

        self.add_tls()

        assert 'error' in self.conf_delete(
            '/certificates/default'
        ), 'remove certificate'

    def test_tls_certificate_remove_nonexisting(self):
        self.load('empty')

        self.certificate()

        self.add_tls()

        assert 'error' in self.conf_delete(
            '/certificates/blah'
        ), 'remove nonexistings certificate'

    @pytest.mark.skip('not yet')
    def test_tls_certificate_update(self):
        self.load('empty')

        self.certificate()

        self.add_tls()

        cert_old = ssl.get_server_certificate(('127.0.0.1', 7080))

        self.certificate()

        assert cert_old != ssl.get_server_certificate(
            ('127.0.0.1', 7080)
        ), 'update certificate'

    @pytest.mark.skip('not yet')
    def test_tls_certificate_key_incorrect(self):
        self.load('empty')

        self.certificate('first', False)
        self.certificate('second', False)

        assert 'error' in self.certificate_load(
            'first', 'second'
        ), 'key incorrect'

    def test_tls_certificate_change(self):
        self.load('empty')

        self.certificate()
        self.certificate('new')

        self.add_tls()

        cert_old = ssl.get_server_certificate(('127.0.0.1', 7080))

        self.add_tls(cert='new')

        assert cert_old != ssl.get_server_certificate(
            ('127.0.0.1', 7080)
        ), 'change certificate'

    def test_tls_certificate_key_rsa(self):
        self.load('empty')

        self.certificate()

        assert (
            self.conf_get('/certificates/default/key') == 'RSA (2048 bits)'
        ), 'certificate key rsa'

    def test_tls_certificate_key_ec(self, temp_dir):
        self.load('empty')

        self.openssl_conf()

        subprocess.check_output(
            [
                'openssl',
                'ecparam',
                '-noout',
                '-genkey',
                '-out',
                f'{temp_dir}/ec.key',
                '-name',
                'prime256v1',
            ],
            stderr=subprocess.STDOUT,
        )

        subprocess.check_output(
            [
                'openssl',
                'req',
                '-x509',
                '-new',
                '-subj',
                '/CN=ec/',
                '-config',
                f'{temp_dir}/openssl.conf',
                '-key',
                f'{temp_dir}/ec.key',
                '-out',
                f'{temp_dir}/ec.crt',
            ],
            stderr=subprocess.STDOUT,
        )

        self.certificate_load('ec')

        assert (
            self.conf_get('/certificates/ec/key') == 'ECDH'
        ), 'certificate key ec'

    def test_tls_certificate_chain_options(self):
        self.load('empty')

        self.certificate()

        chain = self.conf_get('/certificates/default/chain')

        assert len(chain) == 1, 'certificate chain length'

        cert = chain[0]

        assert (
            cert['subject']['common_name'] == 'default'
        ), 'certificate subject common name'
        assert (
            cert['issuer']['common_name'] == 'default'
        ), 'certificate issuer common name'

        assert (
            abs(
                self.sec_epoch()
                - self.openssl_date_to_sec_epoch(cert['validity']['since'])
            )
            < 60
        ), 'certificate validity since'
        assert (
            self.openssl_date_to_sec_epoch(cert['validity']['until'])
            - self.openssl_date_to_sec_epoch(cert['validity']['since'])
            == 2592000
        ), 'certificate validity until'

    def test_tls_certificate_chain(self, temp_dir):
        self.load('empty')

        self.certificate('root', False)

        self.req('int')
        self.req('end')

        self.generate_ca_conf()

        self.ca(cert='root', out='int')
        self.ca(cert='int', out='end')

        crt_path = f'{temp_dir}/end-int.crt'
        end_path = f'{temp_dir}/end.crt'
        int_path = f'{temp_dir}/int.crt'

        with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open(
            int_path, 'rb'
        ) as int:
            crt.write(end.read() + int.read())

        self.set_certificate_req_context()

        # incomplete chain

        assert 'success' in self.certificate_load(
            'end', 'end'
        ), 'certificate chain end upload'

        chain = self.conf_get('/certificates/end/chain')
        assert len(chain) == 1, 'certificate chain end length'
        assert (
            chain[0]['subject']['common_name'] == 'end'
        ), 'certificate chain end subject common name'
        assert (
            chain[0]['issuer']['common_name'] == 'int'
        ), 'certificate chain end issuer common name'

        self.add_tls(cert='end')

        try:
            resp = self.get_ssl()
        except ssl.SSLError:
            resp = None

        assert resp == None, 'certificate chain incomplete chain'

        # intermediate

        assert 'success' in self.certificate_load(
            'int', 'int'
        ), 'certificate chain int upload'

        chain = self.conf_get('/certificates/int/chain')
        assert len(chain) == 1, 'certificate chain int length'
        assert (
            chain[0]['subject']['common_name'] == 'int'
        ), 'certificate chain int subject common name'
        assert (
            chain[0]['issuer']['common_name'] == 'root'
        ), 'certificate chain int issuer common name'

        self.add_tls(cert='int')

        assert self.get_ssl()['status'] == 200, 'certificate chain intermediate'

        # intermediate server

        assert 'success' in self.certificate_load(
            'end-int', 'end'
        ), 'certificate chain end-int upload'

        chain = self.conf_get('/certificates/end-int/chain')
        assert len(chain) == 2, 'certificate chain end-int length'
        assert (
            chain[0]['subject']['common_name'] == 'end'
        ), 'certificate chain end-int int subject common name'
        assert (
            chain[0]['issuer']['common_name'] == 'int'
        ), 'certificate chain end-int int issuer common name'
        assert (
            chain[1]['subject']['common_name'] == 'int'
        ), 'certificate chain end-int end subject common name'
        assert (
            chain[1]['issuer']['common_name'] == 'root'
        ), 'certificate chain end-int end issuer common name'

        self.add_tls(cert='end-int')

        assert (
            self.get_ssl()['status'] == 200
        ), 'certificate chain intermediate server'

    def test_tls_certificate_chain_long(self, temp_dir):
        self.load('empty')

        self.generate_ca_conf()

        # Minimum chain length is 3.
        chain_length = 10

        for i in range(chain_length):
            if i == 0:
                self.certificate('root', False)
            elif i == chain_length - 1:
                self.req('end')
            else:
                self.req(f'int{i}')

        for i in range(chain_length - 1):
            if i == 0:
                self.ca(cert='root', out='int1')
            elif i == chain_length - 2:
                self.ca(cert=f'int{(chain_length - 2)}', out='end')
            else:
                self.ca(cert=f'int{i}', out=f'int{(i + 1)}')

        for i in range(chain_length - 1, 0, -1):
            path = (
                f'{temp_dir}/end.crt'
                if i == chain_length - 1
                else f'{temp_dir}/int{i}.crt'
            )

            with open(f'{temp_dir}/all.crt', 'a') as chain, open(path) as cert:
                chain.write(cert.read())

        self.set_certificate_req_context()

        assert 'success' in self.certificate_load(
            'all', 'end'
        ), 'certificate chain upload'

        chain = self.conf_get('/certificates/all/chain')
        assert len(chain) == chain_length - 1, 'certificate chain length'

        self.add_tls(cert='all')

        assert self.get_ssl()['status'] == 200, 'certificate chain long'

    def test_tls_certificate_empty_cn(self, temp_dir):
        self.certificate('root', False)

        self.req(subject='/')

        self.generate_ca_conf()
        self.ca()

        self.set_certificate_req_context()

        assert 'success' in self.certificate_load('localhost', 'localhost')

        cert = self.conf_get('/certificates/localhost')
        assert cert['chain'][0]['subject'] == {}, 'empty subject'
        assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'

    def test_tls_certificate_empty_cn_san(self, temp_dir):
        self.certificate('root', False)

        self.openssl_conf(
            rewrite=True, alt_names=["example.com", "www.example.net"]
        )

        self.req(subject='/')

        self.generate_ca_conf()
        self.ca()

        self.set_certificate_req_context()

        assert 'success' in self.certificate_load('localhost', 'localhost')

        cert = self.conf_get('/certificates/localhost')
        assert cert['chain'][0]['subject'] == {
            'alt_names': ['example.com', 'www.example.net']
        }, 'subject alt_names'
        assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'

    def test_tls_certificate_empty_cn_san_ip(self):
        self.certificate('root', False)

        self.openssl_conf(
            rewrite=True,
            alt_names=['example.com', 'www.example.net', 'IP|10.0.0.1'],
        )

        self.req(subject='/')

        self.generate_ca_conf()
        self.ca()

        self.set_certificate_req_context()

        assert 'success' in self.certificate_load('localhost', 'localhost')

        cert = self.conf_get('/certificates/localhost')
        assert cert['chain'][0]['subject'] == {
            'alt_names': ['example.com', 'www.example.net']
        }, 'subject alt_names'
        assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'

    def test_tls_keepalive(self):
        self.load('mirror')

        assert self.get()['status'] == 200, 'init'

        self.certificate()

        self.add_tls(application='mirror')

        (resp, sock) = self.post_ssl(
            headers={
                'Host': 'localhost',
                'Connection': 'keep-alive',
            },
            start=True,
            body='0123456789',
            read_timeout=1,
        )

        assert resp['body'] == '0123456789', 'keepalive 1'

        resp = self.post_ssl(
            headers={
                'Host': 'localhost',
                'Connection': 'close',
            },
            sock=sock,
            body='0123456789',
        )

        assert resp['body'] == '0123456789', 'keepalive 2'

    def test_tls_no_close_notify(self):
        self.certificate()

        assert 'success' in self.conf(
            {
                "listeners": {
                    "*:7080": {
                        "pass": "routes",
                        "tls": {"certificate": "default"},
                    }
                },
                "routes": [{"action": {"return": 200}}],
                "applications": {},
            }
        ), 'load application configuration'

        (resp, sock) = self.get_ssl(start=True)

        time.sleep(5)

        sock.close()

    @pytest.mark.skip('not yet')
    def test_tls_keepalive_certificate_remove(self):
        self.load('empty')

        assert self.get()['status'] == 200, 'init'

        self.certificate()

        self.add_tls()

        (resp, sock) = self.get_ssl(
            headers={'Host': 'localhost', 'Connection': 'keep-alive'},
            start=True,
            read_timeout=1,
        )

        assert 'success' in self.conf(
            {"pass": "applications/empty"}, 'listeners/*:7080'
        )
        assert 'success' in self.conf_delete('/certificates/default')

        try:
            resp = self.get_ssl(sock=sock)

        except KeyboardInterrupt:
            raise

        except:
            resp = None

        assert resp == None, 'keepalive remove certificate'

    @pytest.mark.skip('not yet')
    def test_tls_certificates_remove_all(self):
        self.load('empty')

        self.certificate()

        assert 'success' in self.conf_delete(
            '/certificates'
        ), 'remove all certificates'

    def test_tls_application_respawn(self, skip_alert):
        self.load('mirror')

        self.certificate()

        assert 'success' in self.conf('1', 'applications/mirror/processes')

        self.add_tls(application='mirror')

        (_, sock) = self.post_ssl(
            headers={
                'Host': 'localhost',
                'Connection': 'keep-alive',
            },
            start=True,
            body='0123456789',
            read_timeout=1,
        )

        app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0]

        subprocess.check_output(['kill', '-9', app_id])

        skip_alert(fr'process {app_id} exited on signal 9')

        self.wait_for_record(
            fr' (?!{app_id}#)(\d+)#\d+ "mirror" application started'
        )

        resp = self.post_ssl(sock=sock, body='0123456789')

        assert resp['status'] == 200, 'application respawn status'
        assert resp['body'] == '0123456789', 'application respawn body'

    def test_tls_url_scheme(self):
        self.load('variables')

        assert (
            self.post(
                headers={
                    'Host': 'localhost',
                    'Content-Type': 'text/html',
                    'Custom-Header': '',
                    'Connection': 'close',
                }
            )['headers']['Wsgi-Url-Scheme']
            == 'http'
        ), 'url scheme http'

        self.certificate()

        self.add_tls(application='variables')

        assert (
            self.post_ssl(
                headers={
                    'Host': 'localhost',
                    'Content-Type': 'text/html',
                    'Custom-Header': '',
                    'Connection': 'close',
                }
            )['headers']['Wsgi-Url-Scheme']
            == 'https'
        ), 'url scheme https'

    def test_tls_big_upload(self):
        self.load('upload')

        self.certificate()

        self.add_tls(application='upload')

        filename = 'test.txt'
        data = '0123456789' * 9000

        res = self.post_ssl(
            body={
                'file': {
                    'filename': filename,
                    'type': 'text/plain',
                    'data': io.StringIO(data),
                }
            }
        )
        assert res['status'] == 200, 'status ok'
        assert res['body'] == f'{filename}{data}'

    def test_tls_multi_listener(self):
        self.load('empty')

        self.certificate()

        self.add_tls()
        self.add_tls(port=7081)

        assert self.get_ssl()['status'] == 200, 'listener #1'

        assert self.get_ssl(port=7081)['status'] == 200, 'listener #2'