summaryrefslogblamecommitdiffhomepage
path: root/test/test_tls_session.py
blob: 997b71484e7d3db9d58d4c9fdfc5390da1537a3c (plain) (tree)
1
2
3
4
5
6



             

                                  








                                                    
 
                                               
 
 
                                         
                                 
                                   

















































                                                                         
                                             

                                           
                                                 

                                     
                                                          
 
                                             

                                                 
                                                 

                              
                                                 

                                
                                                               
 

                                                                          
 

                                                                              
 
                                      
 
                                                          
 

                                                                        
 

                                                                             

                                       
                                                                     
 
                                             

                                           
                                                 



                                   
                                                 






                                                         
import socket
import time

import pytest

pytest.importorskip('OpenSSL.SSL')
from OpenSSL.SSL import (
    TLSv1_2_METHOD,
    SESS_CACHE_CLIENT,
    OP_NO_TICKET,
    Context,
    Connection,
    _lib,
)
from unit.applications.tls import TestApplicationTLS

prerequisites = {'modules': {'openssl': 'any'}}


class TestTLSSession(TestApplicationTLS):
    @pytest.fixture(autouse=True)
    def setup_method_fixture(self):
        self.certificate()

        assert 'success' in self.conf(
            {
                "listeners": {
                    "*:7080": {
                        "pass": "routes",
                        "tls": {"certificate": "default", "session": {}},
                    }
                },
                "routes": [{"action": {"return": 200}}],
                "applications": {},
            }
        ), 'load application configuration'

    def add_session(self, cache_size=None, timeout=None):
        session = {}

        if cache_size is not None:
            session['cache_size'] = cache_size
        if timeout is not None:
            session['timeout'] = timeout

        return self.conf(session, 'listeners/*:7080/tls/session')

    def connect(self, ctx=None, session=None):
        sock = socket.create_connection(('127.0.0.1', 7080))

        if ctx is None:
            ctx = Context(TLSv1_2_METHOD)
            ctx.set_session_cache_mode(SESS_CACHE_CLIENT)
            ctx.set_options(OP_NO_TICKET)

        client = Connection(ctx, sock)
        client.set_connect_state()

        if session is not None:
            client.set_session(session)

        client.do_handshake()
        client.shutdown()

        return (
            client,
            client.get_session(),
            ctx,
            _lib.SSL_session_reused(client._ssl),
        )

    def test_tls_session(self):
        _, sess, ctx, reused = self.connect()
        assert not reused, 'new connection'

        _, _, _, reused = self.connect(ctx, sess)
        assert not reused, 'no cache'

        assert 'success' in self.add_session(cache_size=2)

        _, sess, ctx, reused = self.connect()
        assert not reused, 'new connection cache'

        _, _, _, reused = self.connect(ctx, sess)
        assert reused, 'cache'

        _, _, _, reused = self.connect(ctx, sess)
        assert reused, 'cache 2'

        # check that at least one session of four is not reused

        clients = [self.connect() for _ in range(4)]
        assert True not in [c[-1] for c in clients], 'cache small all new'

        clients_again = [self.connect(c[2], c[1]) for c in clients]
        assert False in [c[-1] for c in clients_again], 'cache small no reuse'

        # all four sessions are reused

        assert 'success' in self.add_session(cache_size=8)

        clients = [self.connect() for _ in range(4)]
        assert True not in [c[-1] for c in clients], 'cache big all new'

        clients_again = [self.connect(c[2], c[1]) for c in clients]
        assert False not in [c[-1] for c in clients_again], 'cache big reuse'

    def test_tls_session_timeout(self):
        assert 'success' in self.add_session(cache_size=5, timeout=1)

        _, sess, ctx, reused = self.connect()
        assert not reused, 'new connection'

        _, _, _, reused = self.connect(ctx, sess)
        assert reused, 'no timeout'

        time.sleep(3)

        _, _, _, reused = self.connect(ctx, sess)
        assert not reused, 'timeout'

    def test_tls_session_invalid(self):
        assert 'error' in self.add_session(cache_size=-1)
        assert 'error' in self.add_session(cache_size={})
        assert 'error' in self.add_session(timeout=-1)
        assert 'error' in self.add_session(timeout={})