diff options
author | Konstantin Pavlov <thresh@nginx.com> | 2023-08-31 09:41:46 -0700 |
---|---|---|
committer | Konstantin Pavlov <thresh@nginx.com> | 2023-08-31 09:41:46 -0700 |
commit | c45c8919c7232eb20023484f6d1fc9f1f50395d8 (patch) | |
tree | cc12eb307c1611494948645e4b487fa06495c3d2 /test/test_tls_session.py | |
parent | 88c90e1c351ab8c5bd487a5cd4b735014b08e271 (diff) | |
parent | 9b22b6957bc87b3df002d0bc691fdae6a20abdac (diff) | |
download | unit-c45c8919c7232eb20023484f6d1fc9f1f50395d8.tar.gz unit-c45c8919c7232eb20023484f6d1fc9f1f50395d8.tar.bz2 |
Merged with the default branch.1.31.0-1
Diffstat (limited to 'test/test_tls_session.py')
-rw-r--r-- | test/test_tls_session.py | 174 |
1 files changed, 94 insertions, 80 deletions
diff --git a/test/test_tls_session.py b/test/test_tls_session.py index 58f11f2d..8b2b04fd 100644 --- a/test/test_tls_session.py +++ b/test/test_tls_session.py @@ -12,115 +12,129 @@ from OpenSSL.SSL import ( Connection, _lib, ) -from unit.applications.tls import TestApplicationTLS +from unit.applications.tls import ApplicationTLS +prerequisites = {'modules': {'openssl': 'any'}} -class TestTLSSession(TestApplicationTLS): - prerequisites = {'modules': {'openssl': 'any'}} +client = ApplicationTLS() - @pytest.fixture(autouse=True) - def setup_method_fixture(self, request): - self.certificate() - assert 'success' in self.conf( - { - "listeners": { - "*:7080": { - "pass": "routes", - "tls": {"certificate": "default", "session": {}}, - } - }, - "routes": [{"action": {"return": 200}}], - "applications": {}, - } - ), 'load application configuration' +@pytest.fixture(autouse=True) +def setup_method_fixture(): + client.certificate() - def add_session(self, cache_size=None, timeout=None): - session = {} + assert 'success' in client.conf( + { + "listeners": { + "*:7080": { + "pass": "routes", + "tls": {"certificate": "default", "session": {}}, + } + }, + "routes": [{"action": {"return": 200}}], + "applications": {}, + } + ), 'load application configuration' - if cache_size is not None: - session['cache_size'] = cache_size - if timeout is not None: - session['timeout'] = timeout - return self.conf(session, 'listeners/*:7080/tls/session') +def add_session(cache_size=None, timeout=None): + session = {} - def connect(self, ctx=None, session=None): - sock = socket.create_connection(('127.0.0.1', 7080)) + if cache_size is not None: + session['cache_size'] = cache_size + if timeout is not None: + session['timeout'] = timeout - if ctx is None: - ctx = Context(TLSv1_2_METHOD) - ctx.set_session_cache_mode(SESS_CACHE_CLIENT) - ctx.set_options(OP_NO_TICKET) + return client.conf(session, 'listeners/*:7080/tls/session') - client = Connection(ctx, sock) - client.set_connect_state() - if session is not None: - client.set_session(session) +def connect(ctx=None, session=None): + sock = socket.create_connection(('127.0.0.1', 7080)) - client.do_handshake() - client.shutdown() + if ctx is None: + ctx = Context(TLSv1_2_METHOD) + ctx.set_session_cache_mode(SESS_CACHE_CLIENT) + ctx.set_options(OP_NO_TICKET) - return ( - client, - client.get_session(), - ctx, - _lib.SSL_session_reused(client._ssl), - ) + conn = Connection(ctx, sock) + conn.set_connect_state() - def test_tls_session(self): - client, sess, ctx, reused = self.connect() - assert not reused, 'new connection' + if session is not None: + conn.set_session(session) - client, _, _, reused = self.connect(ctx, sess) - assert not reused, 'no cache' + conn.do_handshake() + conn.shutdown() - assert 'success' in self.add_session(cache_size=2) + return ( + conn, + conn.get_session(), + ctx, + _lib.SSL_session_reused(conn._ssl), + ) - client, sess, ctx, reused = self.connect() - assert not reused, 'new connection cache' - client, _, _, reused = self.connect(ctx, sess) - assert reused, 'cache' +@pytest.mark.skipif( + not hasattr(_lib, 'SSL_session_reused'), + reason='session reuse is not supported', +) +def test_tls_session(): + _, sess, ctx, reused = connect() + assert not reused, 'new connection' + + _, _, _, reused = connect(ctx, sess) + assert not reused, 'no cache' + + assert 'success' in add_session(cache_size=2) + + _, sess, ctx, reused = connect() + assert not reused, 'new connection cache' - client, _, _, reused = self.connect(ctx, sess) - assert reused, 'cache 2' + _, _, _, reused = connect(ctx, sess) + assert reused, 'cache' - # check that at least one session of four is not reused + _, _, _, reused = connect(ctx, sess) + assert reused, 'cache 2' - clients = [self.connect() for _ in range(4)] - assert True not in [c[-1] for c in clients], 'cache small all new' + # check that at least one session of four is not reused - clients_again = [self.connect(c[2], c[1]) for c in clients] - assert False in [c[-1] for c in clients_again], 'cache small no reuse' + conns = [connect() for _ in range(4)] + assert True not in [c[-1] for c in conns], 'cache small all new' - # all four sessions are reused + conns_again = [connect(c[2], c[1]) for c in conns] + assert False in [c[-1] for c in conns_again], 'cache small no reuse' - assert 'success' in self.add_session(cache_size=8) + # all four sessions are reused - clients = [self.connect() for _ in range(4)] - assert True not in [c[-1] for c in clients], 'cache big all new' + assert 'success' in add_session(cache_size=8) - clients_again = [self.connect(c[2], c[1]) for c in clients] - assert False not in [c[-1] for c in clients_again], 'cache big reuse' + conns = [connect() for _ in range(4)] + assert True not in [c[-1] for c in conns], 'cache big all new' + + conns_again = [connect(c[2], c[1]) for c in conns] + assert False not in [c[-1] for c in conns_again], 'cache big reuse' + + +@pytest.mark.skipif( + not hasattr(_lib, 'SSL_session_reused'), + reason='session reuse is not supported', +) +def test_tls_session_timeout(): + assert 'success' in add_session(cache_size=5, timeout=1) - def test_tls_session_timeout(self): - assert 'success' in self.add_session(cache_size=5, timeout=1) + _, sess, ctx, reused = connect() + assert not reused, 'new connection' - client, sess, ctx, reused = self.connect() - assert not reused, 'new connection' + _, _, _, reused = connect(ctx, sess) + assert reused, 'no timeout' - client, _, _, reused = self.connect(ctx, sess) - assert reused, 'no timeout' + time.sleep(3) - time.sleep(3) + _, _, _, reused = connect(ctx, sess) + assert not reused, 'timeout' - client, _, _, reused = self.connect(ctx, sess) - assert not reused, 'timeout' - def test_tls_session_invalid(self): - assert 'error' in self.add_session(cache_size=-1) - assert 'error' in self.add_session(cache_size={}) - assert 'error' in self.add_session(timeout=-1) - assert 'error' in self.add_session(timeout={}) +def test_tls_session_invalid(): + assert 'error' in add_session(cache_size=-1) + assert 'error' in add_session(cache_size={}) + assert 'error' in add_session(timeout=-1) + assert 'error' in add_session(timeout={}) |