diff options
-rw-r--r-- | test/test_tls_tickets.py | 195 |
1 files changed, 195 insertions, 0 deletions
diff --git a/test/test_tls_tickets.py b/test/test_tls_tickets.py new file mode 100644 index 00000000..0ea113f5 --- /dev/null +++ b/test/test_tls_tickets.py @@ -0,0 +1,195 @@ +import socket + +import pytest +from OpenSSL.SSL import ( + TLSv1_2_METHOD, + Context, + Connection, + Session, + _lib, +) +from unit.applications.tls import TestApplicationTLS +from unit.option import option + + +class TestTLSTicket(TestApplicationTLS): + prerequisites = {'modules': {'openssl': 'any'}} + + ticket = 'U1oDTh11mMxODuw12gS0EXX1E/PkZG13cJNQ6m5+6BGlfPTjNlIEw7PSVU3X1gTE' + ticket2 = ( + '5AV0DSYIYbZWZQB7fCnTHZmMxtotb/aXjam+n2XS79lTvX3Tq9xGqpC8XKNEF2lt' + ) + ticket80 = '6Pfil8lv/k8zf8MndPpfXaO5EAV6dhME6zs6CfUyq2yziynQwSywtKQMqHGnJ2HR\ +49TZXi/Y4/8RSIO7QPsU51/HLR1gWIMhVM2m9yh93Bw=' + + @pytest.fixture(autouse=True) + def setup_method_fixture(self, request): + self.certificate() + + listener_conf = { + "pass": "routes", + "tls": { + "certificate": "default", + "session": {"cache_size": 0, "tickets": True}, + }, + } + + assert 'success' in self.conf( + { + "listeners": { + "*:7080": listener_conf, + "*:7081": listener_conf, + "*:7082": listener_conf, + }, + "routes": [{"action": {"return": 200}}], + "applications": {}, + } + ), 'load application configuration' + + def set_tickets(self, tickets=True, port=7080): + assert 'success' in self.conf( + {"cache_size": 0, "tickets": tickets}, + 'listeners/*:' + str(port) + '/tls/session', + ) + + def connect(self, ctx=None, session=None, port=7080): + sock = socket.create_connection(('127.0.0.1', port)) + + if ctx is None: + ctx = Context(TLSv1_2_METHOD) + + client = Connection(ctx, sock) + client.set_connect_state() + + if session is not None: + client.set_session(session) + + client.do_handshake() + client.shutdown() + + return ( + client.get_session(), + ctx, + _lib.SSL_session_reused(client._ssl), + ) + + def has_ticket(self, sess): + return _lib.SSL_SESSION_has_ticket(sess._session) + + @pytest.mark.skipif( + not hasattr(_lib, 'SSL_SESSION_has_ticket'), + reason='ticket check is not supported', + ) + def test_tls_ticket(self): + sess, ctx, reused = self.connect() + assert self.has_ticket(sess), 'tickets True' + assert not reused, 'tickets True not reused' + + sess, ctx, reused = self.connect(ctx, sess) + assert self.has_ticket(sess), 'tickets True reconnect' + assert reused, 'tickets True reused' + + self.set_tickets(tickets=False) + + sess, _, _ = self.connect() + assert not self.has_ticket(sess), 'tickets False' + + assert 'success' in self.conf_delete( + 'listeners/*:7080/tls/session/tickets' + ), 'tickets default configure' + + sess, _, _ = self.connect() + assert not self.has_ticket(sess), 'tickets default (false)' + + @pytest.mark.skipif( + not hasattr(_lib, 'SSL_SESSION_has_ticket'), + reason='ticket check is not supported', + ) + def test_tls_ticket_string(self): + self.set_tickets(self.ticket) + sess, ctx, _ = self.connect() + assert self.has_ticket(sess), 'tickets string' + + sess2, _, reused = self.connect(ctx, sess) + assert self.has_ticket(sess2), 'tickets string reconnect' + assert reused, 'tickets string reused' + + sess2, _, reused = self.connect(ctx, sess, port=7081) + assert self.has_ticket(sess2), 'connect True' + assert not reused, 'connect True not reused' + + self.set_tickets(self.ticket2, port=7081) + + sess2, _, reused = self.connect(ctx, sess, port=7081) + assert self.has_ticket(sess2), 'wrong ticket' + assert not reused, 'wrong ticket not reused' + + self.set_tickets(self.ticket80) + + sess, ctx, _ = self.connect() + assert self.has_ticket(sess), 'tickets string 80' + + sess2, _, reused = self.connect(ctx, sess) + assert self.has_ticket(sess2), 'tickets string 80 reconnect' + assert reused, 'tickets string 80 reused' + + sess2, _, reused = self.connect(ctx, sess, port=7081) + assert self.has_ticket(sess2), 'wrong ticket 80' + assert not reused, 'wrong ticket 80 not reused' + + @pytest.mark.skipif( + not hasattr(_lib, 'SSL_SESSION_has_ticket'), + reason='ticket check is not supported', + ) + def test_tls_ticket_array(self): + self.set_tickets([]) + + sess, ctx, _ = self.connect() + assert not self.has_ticket(sess), 'tickets array empty' + + self.set_tickets([self.ticket, self.ticket2]) + self.set_tickets(self.ticket, port=7081) + self.set_tickets(self.ticket2, port=7082) + + sess, ctx, _ = self.connect() + _, _, reused = self.connect(ctx, sess, port=7081) + assert not reused, 'not last ticket' + _, _, reused = self.connect(ctx, sess, port=7082) + assert reused, 'last ticket' + + sess, ctx, _ = self.connect(port=7081) + _, _, reused = self.connect(ctx, sess) + assert reused, 'first ticket' + + sess, ctx, _ = self.connect(port=7082) + _, _, reused = self.connect(ctx, sess) + assert reused, 'second ticket' + + assert 'success' in self.conf_delete( + 'listeners/*:7080/tls/session/tickets/0' + ), 'removed first ticket' + assert 'success' in self.conf_post( + '"' + self.ticket + '"', 'listeners/*:7080/tls/session/tickets' + ), 'add new ticket to the end of array' + + sess, ctx, _ = self.connect() + _, _, reused = self.connect(ctx, sess, port=7082) + assert not reused, 'not last ticket 2' + _, _, reused = self.connect(ctx, sess, port=7081) + assert reused, 'last ticket 2' + + def test_tls_ticket_invalid(self): + def check_tickets(tickets): + assert 'error' in self.conf( + {"tickets": tickets}, 'listeners/*:7080/tls/session', + ) + + check_tickets({}) + check_tickets('!?&^' * 16) + check_tickets(self.ticket[:-2] + '!' + self.ticket[3:]) + check_tickets(self.ticket[:-1]) + check_tickets(self.ticket + 'b') + check_tickets(self.ticket + 'blah') + check_tickets([True, self.ticket, self.ticket2]) + check_tickets([self.ticket, 'blah', self.ticket2]) + check_tickets([self.ticket, self.ticket2, []]) |