summaryrefslogblamecommitdiffhomepage
path: root/test/test_tls.py
blob: 54fdb665c1d7896ebd4ec13e269abc59468fd613 (plain) (tree)
1
2
3
4
5
6
7
8
9
         
          
                 
           
 
             
                                                
                              
 
                                                                
 














































                                                                  


                 

                                      

                    
                                     








                                      

         

                                                         
 

                                                        
 

                                                             
 
 



                                                                      
 
 

                                                             
 















                                              
 
 

                                   
 
                        
 
             
 
                                                                   
 
 

                                      
 
                        
 
             
 
                    
 
                
 
                                                                  
 
 

                                  
 
                        
 


                                           
 
 

                                       
 
                        
 
             
 


                                         
 
 

                                              
 
                        
 
             
 


                                         
 
 


                                  
 
                        
 
             
 
                                                              
 
                        
 


                                                  
 
 


                                         
 

                                       
 


                                              
 
 

                                  
 

                             
 
             
 
                                                              
 
                       
 


                                                  
 
 








                                                                         
 

                                          
 
                         
 












                                 
 
















                                       
 
                                 
 


                                                         
 
 


                                                                     
 
                        
 
                                                          
 
                                                      
 
                   
 





                                                   
 











                                                                       
 
 

                                         
 
                                     
 

              
 
                      
 

                              
 


















































































                                                                              
 
                      
 
























                                                          
 

                                                                           
 


                                                
 

                                                                     
 
                       
 


                                                                   
 
 

                                     
 
                    
 

                      
 
                                                                         
 


                                                                        
 
 

                                        
 


                                                                  
 
                    
 

                      
 
                                                                         
 




                                                                        
 
 

                                           
 



                                                                    
 
                    
 

                      
 
                                                                         
 




                                                                        
 
 

                         
 
                                                
 
                        
 
                                 
 








                                       
 
                                                      
 







                                  
 
                                                      
 










                                                      
              



                                                    
 
                                          
 
                 
 
                
 
 


                                            
 
                                                
 
                        
 
             
 




                                                                  
 



                                                                   
 

                                        
 

                             
 

                   
 
                                                       
 
 


                                       
 
                        
 


                                           
 
 

                                                                       
 
                        
 
                                                                         
 
                                 
 








                                       
 
                                                                  
 
                                                   
 
                                                       
 
                                                                             
 
                                                        
 

                                                                   
 
 

                            
 










                                            
 
                        
 
                                    
 










                                            
 
 















                                          
             




                                             
 

                              
 
                        
 

                      
 
                                                           
 
                                                                    
import io
import ssl
import subprocess
import time

import pytest
from unit.applications.tls import ApplicationTLS
from unit.option import option

prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}}

client = ApplicationTLS()


def add_tls(application='empty', cert='default', port=7080):
    assert 'success' in client.conf(
        {
            "pass": f"applications/{application}",
            "tls": {"certificate": cert},
        },
        f'listeners/*:{port}',
    )


def ca(cert='root', out='localhost'):
    subprocess.check_output(
        [
            'openssl',
            'ca',
            '-batch',
            '-config',
            f'{option.temp_dir}/ca.conf',
            '-keyfile',
            f'{option.temp_dir}/{cert}.key',
            '-cert',
            f'{option.temp_dir}/{cert}.crt',
            '-in',
            f'{option.temp_dir}/{out}.csr',
            '-out',
            f'{option.temp_dir}/{out}.crt',
        ],
        stderr=subprocess.STDOUT,
    )


def context_cert_req(cert='root'):
    context = ssl.create_default_context()
    context.check_hostname = False
    context.verify_mode = ssl.CERT_REQUIRED
    context.load_verify_locations(f'{option.temp_dir}/{cert}.crt')

    return context


def generate_ca_conf():
    with open(f'{option.temp_dir}/ca.conf', 'w') as f:
        f.write(
            f"""[ ca ]
default_ca = myca

[ myca ]
new_certs_dir = {option.temp_dir}
database = {option.temp_dir}/certindex
default_md = sha256
policy = myca_policy
serial = {option.temp_dir}/certserial
default_days = 1
x509_extensions = myca_extensions
copy_extensions = copy

[ myca_policy ]
commonName = optional

[ myca_extensions ]
basicConstraints = critical,CA:TRUE"""
        )

    with open(f'{option.temp_dir}/certserial', 'w') as f:
        f.write('1000')

    with open(f'{option.temp_dir}/certindex', 'w') as f:
        f.write('')

    with open(f'{option.temp_dir}/certindex.attr', 'w') as f:
        f.write('')


def remove_tls(application='empty', port=7080):
    assert 'success' in client.conf(
        {"pass": f"applications/{application}"}, f'listeners/*:{port}'
    )


def req(name='localhost', subject=None):
    subj = subject if subject is not None else f'/CN={name}/'

    subprocess.check_output(
        [
            'openssl',
            'req',
            '-new',
            '-subj',
            subj,
            '-config',
            f'{option.temp_dir}/openssl.conf',
            '-out',
            f'{option.temp_dir}/{name}.csr',
            '-keyout',
            f'{option.temp_dir}/{name}.key',
        ],
        stderr=subprocess.STDOUT,
    )


def test_tls_listener_option_add():
    client.load('empty')

    client.certificate()

    add_tls()

    assert client.get_ssl()['status'] == 200, 'add listener option'


def test_tls_listener_option_remove():
    client.load('empty')

    client.certificate()

    add_tls()

    client.get_ssl()

    remove_tls()

    assert client.get()['status'] == 200, 'remove listener option'


def test_tls_certificate_remove():
    client.load('empty')

    client.certificate()

    assert 'success' in client.conf_delete(
        '/certificates/default'
    ), 'remove certificate'


def test_tls_certificate_remove_used():
    client.load('empty')

    client.certificate()

    add_tls()

    assert 'error' in client.conf_delete(
        '/certificates/default'
    ), 'remove certificate'


def test_tls_certificate_remove_nonexisting():
    client.load('empty')

    client.certificate()

    add_tls()

    assert 'error' in client.conf_delete(
        '/certificates/blah'
    ), 'remove nonexistings certificate'


@pytest.mark.skip('not yet')
def test_tls_certificate_update():
    client.load('empty')

    client.certificate()

    add_tls()

    cert_old = ssl.get_server_certificate(('127.0.0.1', 7080))

    client.certificate()

    assert cert_old != ssl.get_server_certificate(
        ('127.0.0.1', 7080)
    ), 'update certificate'


@pytest.mark.skip('not yet')
def test_tls_certificate_key_incorrect():
    client.load('empty')

    client.certificate('first', False)
    client.certificate('second', False)

    assert 'error' in client.certificate_load(
        'first', 'second'
    ), 'key incorrect'


def test_tls_certificate_change():
    client.load('empty')

    client.certificate()
    client.certificate('new')

    add_tls()

    cert_old = ssl.get_server_certificate(('127.0.0.1', 7080))

    add_tls(cert='new')

    assert cert_old != ssl.get_server_certificate(
        ('127.0.0.1', 7080)
    ), 'change certificate'


def test_tls_certificate_key_rsa():
    client.load('empty')

    client.certificate()

    assert (
        client.conf_get('/certificates/default/key') == 'RSA (2048 bits)'
    ), 'certificate key rsa'


def test_tls_certificate_key_ec(temp_dir):
    client.load('empty')

    client.openssl_conf()

    subprocess.check_output(
        [
            'openssl',
            'ecparam',
            '-noout',
            '-genkey',
            '-out',
            f'{temp_dir}/ec.key',
            '-name',
            'prime256v1',
        ],
        stderr=subprocess.STDOUT,
    )

    subprocess.check_output(
        [
            'openssl',
            'req',
            '-x509',
            '-new',
            '-subj',
            '/CN=ec/',
            '-config',
            f'{temp_dir}/openssl.conf',
            '-key',
            f'{temp_dir}/ec.key',
            '-out',
            f'{temp_dir}/ec.crt',
        ],
        stderr=subprocess.STDOUT,
    )

    client.certificate_load('ec')

    assert (
        client.conf_get('/certificates/ec/key') == 'ECDH'
    ), 'certificate key ec'


def test_tls_certificate_chain_options(date_to_sec_epoch, sec_epoch):
    client.load('empty')
    date_format = '%b %d %X %Y %Z'

    client.certificate()

    chain = client.conf_get('/certificates/default/chain')

    assert len(chain) == 1, 'certificate chain length'

    cert = chain[0]

    assert (
        cert['subject']['common_name'] == 'default'
    ), 'certificate subject common name'
    assert (
        cert['issuer']['common_name'] == 'default'
    ), 'certificate issuer common name'

    assert (
        abs(
            sec_epoch
            - date_to_sec_epoch(cert['validity']['since'], date_format)
        )
        < 60
    ), 'certificate validity since'
    assert (
        date_to_sec_epoch(cert['validity']['until'], date_format)
        - date_to_sec_epoch(cert['validity']['since'], date_format)
        == 2592000
    ), 'certificate validity until'


def test_tls_certificate_chain(temp_dir):
    client.load('empty')

    client.certificate('root', False)

    req('int')
    req('end')

    generate_ca_conf()

    ca(cert='root', out='int')
    ca(cert='int', out='end')

    crt_path = f'{temp_dir}/end-int.crt'
    end_path = f'{temp_dir}/end.crt'
    int_path = f'{temp_dir}/int.crt'

    with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open(
        int_path, 'rb'
    ) as int:
        crt.write(end.read() + int.read())

    # incomplete chain

    assert 'success' in client.certificate_load(
        'end', 'end'
    ), 'certificate chain end upload'

    chain = client.conf_get('/certificates/end/chain')
    assert len(chain) == 1, 'certificate chain end length'
    assert (
        chain[0]['subject']['common_name'] == 'end'
    ), 'certificate chain end subject common name'
    assert (
        chain[0]['issuer']['common_name'] == 'int'
    ), 'certificate chain end issuer common name'

    add_tls(cert='end')

    ctx_cert_req = context_cert_req()
    try:
        resp = client.get_ssl(context=ctx_cert_req)
    except ssl.SSLError:
        resp = None

    assert resp is None, 'certificate chain incomplete chain'

    # intermediate

    assert 'success' in client.certificate_load(
        'int', 'int'
    ), 'certificate chain int upload'

    chain = client.conf_get('/certificates/int/chain')
    assert len(chain) == 1, 'certificate chain int length'
    assert (
        chain[0]['subject']['common_name'] == 'int'
    ), 'certificate chain int subject common name'
    assert (
        chain[0]['issuer']['common_name'] == 'root'
    ), 'certificate chain int issuer common name'

    add_tls(cert='int')

    assert client.get_ssl()['status'] == 200, 'certificate chain intermediate'

    # intermediate server

    assert 'success' in client.certificate_load(
        'end-int', 'end'
    ), 'certificate chain end-int upload'

    chain = client.conf_get('/certificates/end-int/chain')
    assert len(chain) == 2, 'certificate chain end-int length'
    assert (
        chain[0]['subject']['common_name'] == 'end'
    ), 'certificate chain end-int int subject common name'
    assert (
        chain[0]['issuer']['common_name'] == 'int'
    ), 'certificate chain end-int int issuer common name'
    assert (
        chain[1]['subject']['common_name'] == 'int'
    ), 'certificate chain end-int end subject common name'
    assert (
        chain[1]['issuer']['common_name'] == 'root'
    ), 'certificate chain end-int end issuer common name'

    add_tls(cert='end-int')

    assert (
        client.get_ssl(context=ctx_cert_req)['status'] == 200
    ), 'certificate chain intermediate server'


def test_tls_certificate_chain_long(temp_dir):
    client.load('empty')

    generate_ca_conf()

    # Minimum chain length is 3.
    chain_length = 10

    for i in range(chain_length):
        if i == 0:
            client.certificate('root', False)
        elif i == chain_length - 1:
            req('end')
        else:
            req(f'int{i}')

    for i in range(chain_length - 1):
        if i == 0:
            ca(cert='root', out='int1')
        elif i == chain_length - 2:
            ca(cert=f'int{(chain_length - 2)}', out='end')
        else:
            ca(cert=f'int{i}', out=f'int{(i + 1)}')

    for i in range(chain_length - 1, 0, -1):
        path = (
            f'{temp_dir}/end.crt'
            if i == chain_length - 1
            else f'{temp_dir}/int{i}.crt'
        )

        with open(f'{temp_dir}/all.crt', 'a') as chain, open(path) as cert:
            chain.write(cert.read())

    assert 'success' in client.certificate_load(
        'all', 'end'
    ), 'certificate chain upload'

    chain = client.conf_get('/certificates/all/chain')
    assert len(chain) == chain_length - 1, 'certificate chain length'

    add_tls(cert='all')

    assert (
        client.get_ssl(context=context_cert_req())['status'] == 200
    ), 'certificate chain long'


def test_tls_certificate_empty_cn():
    client.certificate('root', False)

    req(subject='/')

    generate_ca_conf()
    ca()

    assert 'success' in client.certificate_load('localhost', 'localhost')

    cert = client.conf_get('/certificates/localhost')
    assert cert['chain'][0]['subject'] == {}, 'empty subject'
    assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'


def test_tls_certificate_empty_cn_san():
    client.certificate('root', False)

    client.openssl_conf(
        rewrite=True, alt_names=["example.com", "www.example.net"]
    )

    req(subject='/')

    generate_ca_conf()
    ca()

    assert 'success' in client.certificate_load('localhost', 'localhost')

    cert = client.conf_get('/certificates/localhost')
    assert cert['chain'][0]['subject'] == {
        'alt_names': ['example.com', 'www.example.net']
    }, 'subject alt_names'
    assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'


def test_tls_certificate_empty_cn_san_ip():
    client.certificate('root', False)

    client.openssl_conf(
        rewrite=True,
        alt_names=['example.com', 'www.example.net', 'IP|10.0.0.1'],
    )

    req(subject='/')

    generate_ca_conf()
    ca()

    assert 'success' in client.certificate_load('localhost', 'localhost')

    cert = client.conf_get('/certificates/localhost')
    assert cert['chain'][0]['subject'] == {
        'alt_names': ['example.com', 'www.example.net']
    }, 'subject alt_names'
    assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'


def test_tls_keepalive():
    client.load('mirror')

    assert client.get()['status'] == 200, 'init'

    client.certificate()

    add_tls(application='mirror')

    (resp, sock) = client.post_ssl(
        headers={
            'Host': 'localhost',
            'Connection': 'keep-alive',
        },
        start=True,
        body='0123456789',
        read_timeout=1,
    )

    assert resp['body'] == '0123456789', 'keepalive 1'

    resp = client.post_ssl(
        headers={
            'Host': 'localhost',
            'Connection': 'close',
        },
        sock=sock,
        body='0123456789',
    )

    assert resp['body'] == '0123456789', 'keepalive 2'


def test_tls_no_close_notify():
    client.certificate()

    assert 'success' in client.conf(
        {
            "listeners": {
                "*:7080": {
                    "pass": "routes",
                    "tls": {"certificate": "default"},
                }
            },
            "routes": [{"action": {"return": 200}}],
            "applications": {},
        }
    ), 'load application configuration'

    (_, sock) = client.get_ssl(start=True)

    time.sleep(5)

    sock.close()


@pytest.mark.skip('not yet')
def test_tls_keepalive_certificate_remove():
    client.load('empty')

    assert client.get()['status'] == 200, 'init'

    client.certificate()

    add_tls()

    (resp, sock) = client.get_ssl(
        headers={'Host': 'localhost', 'Connection': 'keep-alive'},
        start=True,
        read_timeout=1,
    )

    assert 'success' in client.conf(
        {"pass": "applications/empty"}, 'listeners/*:7080'
    )
    assert 'success' in client.conf_delete('/certificates/default')

    try:
        resp = client.get_ssl(sock=sock)

    except KeyboardInterrupt:
        raise

    except:
        resp = None

    assert resp is None, 'keepalive remove certificate'


@pytest.mark.skip('not yet')
def test_tls_certificates_remove_all():
    client.load('empty')

    client.certificate()

    assert 'success' in client.conf_delete(
        '/certificates'
    ), 'remove all certificates'


def test_tls_application_respawn(findall, skip_alert, wait_for_record):
    client.load('mirror')

    client.certificate()

    assert 'success' in client.conf('1', 'applications/mirror/processes')

    add_tls(application='mirror')

    (_, sock) = client.post_ssl(
        headers={
            'Host': 'localhost',
            'Connection': 'keep-alive',
        },
        start=True,
        body='0123456789',
        read_timeout=1,
    )

    app_id = findall(r'(\d+)#\d+ "mirror" application started')[0]

    subprocess.check_output(['kill', '-9', app_id])

    skip_alert(fr'process {app_id} exited on signal 9')

    wait_for_record(fr' (?!{app_id}#)(\d+)#\d+ "mirror" application started')

    resp = client.post_ssl(sock=sock, body='0123456789')

    assert resp['status'] == 200, 'application respawn status'
    assert resp['body'] == '0123456789', 'application respawn body'


def test_tls_url_scheme():
    client.load('variables')

    assert (
        client.post(
            headers={
                'Host': 'localhost',
                'Content-Type': 'text/html',
                'Custom-Header': '',
                'Connection': 'close',
            }
        )['headers']['Wsgi-Url-Scheme']
        == 'http'
    ), 'url scheme http'

    client.certificate()

    add_tls(application='variables')

    assert (
        client.post_ssl(
            headers={
                'Host': 'localhost',
                'Content-Type': 'text/html',
                'Custom-Header': '',
                'Connection': 'close',
            }
        )['headers']['Wsgi-Url-Scheme']
        == 'https'
    ), 'url scheme https'


def test_tls_big_upload():
    client.load('upload')

    client.certificate()

    add_tls(application='upload')

    filename = 'test.txt'
    data = '0123456789' * 9000

    res = client.post_ssl(
        body={
            'file': {
                'filename': filename,
                'type': 'text/plain',
                'data': io.StringIO(data),
            }
        }
    )
    assert res['status'] == 200, 'status ok'
    assert res['body'] == f'{filename}{data}'


def test_tls_multi_listener():
    client.load('empty')

    client.certificate()

    add_tls()
    add_tls(port=7081)

    assert client.get_ssl()['status'] == 200, 'listener #1'

    assert client.get_ssl(port=7081)['status'] == 200, 'listener #2'