summaryrefslogtreecommitdiffhomepage
path: root/test/test_tls_session.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--test/test_tls_session.py174
1 files changed, 94 insertions, 80 deletions
diff --git a/test/test_tls_session.py b/test/test_tls_session.py
index 58f11f2d..8b2b04fd 100644
--- a/test/test_tls_session.py
+++ b/test/test_tls_session.py
@@ -12,115 +12,129 @@ from OpenSSL.SSL import (
Connection,
_lib,
)
-from unit.applications.tls import TestApplicationTLS
+from unit.applications.tls import ApplicationTLS
+prerequisites = {'modules': {'openssl': 'any'}}
-class TestTLSSession(TestApplicationTLS):
- prerequisites = {'modules': {'openssl': 'any'}}
+client = ApplicationTLS()
- @pytest.fixture(autouse=True)
- def setup_method_fixture(self, request):
- self.certificate()
- assert 'success' in self.conf(
- {
- "listeners": {
- "*:7080": {
- "pass": "routes",
- "tls": {"certificate": "default", "session": {}},
- }
- },
- "routes": [{"action": {"return": 200}}],
- "applications": {},
- }
- ), 'load application configuration'
+@pytest.fixture(autouse=True)
+def setup_method_fixture():
+ client.certificate()
- def add_session(self, cache_size=None, timeout=None):
- session = {}
+ assert 'success' in client.conf(
+ {
+ "listeners": {
+ "*:7080": {
+ "pass": "routes",
+ "tls": {"certificate": "default", "session": {}},
+ }
+ },
+ "routes": [{"action": {"return": 200}}],
+ "applications": {},
+ }
+ ), 'load application configuration'
- if cache_size is not None:
- session['cache_size'] = cache_size
- if timeout is not None:
- session['timeout'] = timeout
- return self.conf(session, 'listeners/*:7080/tls/session')
+def add_session(cache_size=None, timeout=None):
+ session = {}
- def connect(self, ctx=None, session=None):
- sock = socket.create_connection(('127.0.0.1', 7080))
+ if cache_size is not None:
+ session['cache_size'] = cache_size
+ if timeout is not None:
+ session['timeout'] = timeout
- if ctx is None:
- ctx = Context(TLSv1_2_METHOD)
- ctx.set_session_cache_mode(SESS_CACHE_CLIENT)
- ctx.set_options(OP_NO_TICKET)
+ return client.conf(session, 'listeners/*:7080/tls/session')
- client = Connection(ctx, sock)
- client.set_connect_state()
- if session is not None:
- client.set_session(session)
+def connect(ctx=None, session=None):
+ sock = socket.create_connection(('127.0.0.1', 7080))
- client.do_handshake()
- client.shutdown()
+ if ctx is None:
+ ctx = Context(TLSv1_2_METHOD)
+ ctx.set_session_cache_mode(SESS_CACHE_CLIENT)
+ ctx.set_options(OP_NO_TICKET)
- return (
- client,
- client.get_session(),
- ctx,
- _lib.SSL_session_reused(client._ssl),
- )
+ conn = Connection(ctx, sock)
+ conn.set_connect_state()
- def test_tls_session(self):
- client, sess, ctx, reused = self.connect()
- assert not reused, 'new connection'
+ if session is not None:
+ conn.set_session(session)
- client, _, _, reused = self.connect(ctx, sess)
- assert not reused, 'no cache'
+ conn.do_handshake()
+ conn.shutdown()
- assert 'success' in self.add_session(cache_size=2)
+ return (
+ conn,
+ conn.get_session(),
+ ctx,
+ _lib.SSL_session_reused(conn._ssl),
+ )
- client, sess, ctx, reused = self.connect()
- assert not reused, 'new connection cache'
- client, _, _, reused = self.connect(ctx, sess)
- assert reused, 'cache'
+@pytest.mark.skipif(
+ not hasattr(_lib, 'SSL_session_reused'),
+ reason='session reuse is not supported',
+)
+def test_tls_session():
+ _, sess, ctx, reused = connect()
+ assert not reused, 'new connection'
+
+ _, _, _, reused = connect(ctx, sess)
+ assert not reused, 'no cache'
+
+ assert 'success' in add_session(cache_size=2)
+
+ _, sess, ctx, reused = connect()
+ assert not reused, 'new connection cache'
- client, _, _, reused = self.connect(ctx, sess)
- assert reused, 'cache 2'
+ _, _, _, reused = connect(ctx, sess)
+ assert reused, 'cache'
- # check that at least one session of four is not reused
+ _, _, _, reused = connect(ctx, sess)
+ assert reused, 'cache 2'
- clients = [self.connect() for _ in range(4)]
- assert True not in [c[-1] for c in clients], 'cache small all new'
+ # check that at least one session of four is not reused
- clients_again = [self.connect(c[2], c[1]) for c in clients]
- assert False in [c[-1] for c in clients_again], 'cache small no reuse'
+ conns = [connect() for _ in range(4)]
+ assert True not in [c[-1] for c in conns], 'cache small all new'
- # all four sessions are reused
+ conns_again = [connect(c[2], c[1]) for c in conns]
+ assert False in [c[-1] for c in conns_again], 'cache small no reuse'
- assert 'success' in self.add_session(cache_size=8)
+ # all four sessions are reused
- clients = [self.connect() for _ in range(4)]
- assert True not in [c[-1] for c in clients], 'cache big all new'
+ assert 'success' in add_session(cache_size=8)
- clients_again = [self.connect(c[2], c[1]) for c in clients]
- assert False not in [c[-1] for c in clients_again], 'cache big reuse'
+ conns = [connect() for _ in range(4)]
+ assert True not in [c[-1] for c in conns], 'cache big all new'
+
+ conns_again = [connect(c[2], c[1]) for c in conns]
+ assert False not in [c[-1] for c in conns_again], 'cache big reuse'
+
+
+@pytest.mark.skipif(
+ not hasattr(_lib, 'SSL_session_reused'),
+ reason='session reuse is not supported',
+)
+def test_tls_session_timeout():
+ assert 'success' in add_session(cache_size=5, timeout=1)
- def test_tls_session_timeout(self):
- assert 'success' in self.add_session(cache_size=5, timeout=1)
+ _, sess, ctx, reused = connect()
+ assert not reused, 'new connection'
- client, sess, ctx, reused = self.connect()
- assert not reused, 'new connection'
+ _, _, _, reused = connect(ctx, sess)
+ assert reused, 'no timeout'
- client, _, _, reused = self.connect(ctx, sess)
- assert reused, 'no timeout'
+ time.sleep(3)
- time.sleep(3)
+ _, _, _, reused = connect(ctx, sess)
+ assert not reused, 'timeout'
- client, _, _, reused = self.connect(ctx, sess)
- assert not reused, 'timeout'
- def test_tls_session_invalid(self):
- assert 'error' in self.add_session(cache_size=-1)
- assert 'error' in self.add_session(cache_size={})
- assert 'error' in self.add_session(timeout=-1)
- assert 'error' in self.add_session(timeout={})
+def test_tls_session_invalid():
+ assert 'error' in add_session(cache_size=-1)
+ assert 'error' in add_session(cache_size={})
+ assert 'error' in add_session(timeout=-1)
+ assert 'error' in add_session(timeout={})