diff options
Diffstat (limited to '')
-rw-r--r-- | test/test_tls_session.py | 129 |
1 files changed, 129 insertions, 0 deletions
diff --git a/test/test_tls_session.py b/test/test_tls_session.py new file mode 100644 index 00000000..7854fad6 --- /dev/null +++ b/test/test_tls_session.py @@ -0,0 +1,129 @@ +import socket +import time + +import pytest +from OpenSSL.SSL import ( + TLSv1_2_METHOD, + SESS_CACHE_CLIENT, + OP_NO_TICKET, + Context, + Connection, + _lib, +) +from unit.applications.tls import TestApplicationTLS +from unit.option import option + + +class TestTLSSession(TestApplicationTLS): + prerequisites = {'modules': {'openssl': 'any'}} + + @pytest.fixture(autouse=True) + def setup_method_fixture(self, request): + self.certificate() + + assert 'success' in self.conf( + { + "listeners": { + "*:7080": { + "pass": "routes", + "tls": {"certificate": "default", "session": {}}, + } + }, + "routes": [{"action": {"return": 200}}], + "applications": {}, + } + ), 'load application configuration' + + def add_session(self, cache_size=None, timeout=None): + session = {} + + if cache_size is not None: + session['cache_size'] = cache_size + if timeout is not None: + session['timeout'] = timeout + + return self.conf(session, 'listeners/*:7080/tls/session') + + def connect(self, ctx=None, session=None): + sock = socket.create_connection(('127.0.0.1', 7080)) + + if ctx is None: + ctx = Context(TLSv1_2_METHOD) + ctx.set_session_cache_mode(SESS_CACHE_CLIENT) + ctx.set_options(OP_NO_TICKET) + + client = Connection(ctx, sock) + client.set_connect_state() + + if session is not None: + client.set_session(session) + + client.do_handshake() + client.shutdown() + + return ( + client, + client.get_session(), + ctx, + _lib.SSL_session_reused(client._ssl), + ) + + def test_tls_session(self): + client, sess, ctx, reused = self.connect() + assert not reused, 'new connection' + + client, _, _, reused = self.connect(ctx, sess) + assert not reused, 'no cache' + + assert 'success' in self.add_session(cache_size=1) + + client, sess, ctx, reused = self.connect() + assert not reused, 'new connection cache' + + client, _, _, reused = self.connect(ctx, sess) + assert reused, 'cache' + + client, _, _, reused = self.connect(ctx, sess) + assert reused, 'cache 2' + + # check that at least one session of two is not reused + + client, sess, ctx, reused = self.connect() + client2, sess2, ctx2, reused2 = self.connect() + assert True not in [reused, reused2], 'new connection cache small' + + client, _, _, reused = self.connect(ctx, sess) + client2, _, _, reused2 = self.connect(ctx2, sess2) + assert False in [reused, reused2], 'cache small' + + # both sessions are reused + + assert 'success' in self.add_session(cache_size=2) + + client, sess, ctx, reused = self.connect() + client2, sess2, ctx2, reused2 = self.connect() + assert True not in [reused, reused2], 'new connection cache big' + + client, _, _, reused = self.connect(ctx, sess) + client2, _, _, reused2 = self.connect(ctx2, sess2) + assert False not in [reused, reused2], 'cache big' + + def test_tls_session_timeout(self): + assert 'success' in self.add_session(cache_size=1, timeout=1) + + client, sess, ctx, reused = self.connect() + assert not reused, 'new connection' + + client, _, _, reused = self.connect(ctx, sess) + assert reused, 'no timeout' + + time.sleep(3) + + client, _, _, reused = self.connect(ctx, sess) + assert not reused, 'timeout' + + def test_tls_session_invalid(self): + assert 'error' in self.add_session(cache_size=-1) + assert 'error' in self.add_session(cache_size={}) + assert 'error' in self.add_session(timeout=-1) + assert 'error' in self.add_session(timeout={}) |